1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import sys
12import threading
13import warnings
14from collections import abc, defaultdict
15from collections.abc import Sequence
16from functools import lru_cache
17from random import shuffle
18from threading import RLock
19from typing import (
20 TYPE_CHECKING,
21 Any,
22 Callable,
23 ClassVar,
24 Generic,
25 Literal,
26 Optional,
27 TypeVar,
28 Union,
29 cast,
30 overload,
31)
32
33from hypothesis._settings import HealthCheck, Phase, Verbosity, settings
34from hypothesis.control import _current_build_context, current_build_context
35from hypothesis.errors import (
36 HypothesisException,
37 HypothesisWarning,
38 InvalidArgument,
39 NonInteractiveExampleWarning,
40 UnsatisfiedAssumption,
41)
42from hypothesis.internal.conjecture import utils as cu
43from hypothesis.internal.conjecture.data import ConjectureData
44from hypothesis.internal.conjecture.utils import (
45 calc_label_from_cls,
46 calc_label_from_hash,
47 calc_label_from_name,
48 combine_labels,
49)
50from hypothesis.internal.coverage import check_function
51from hypothesis.internal.reflection import (
52 get_pretty_function_description,
53 is_identity_function,
54)
55from hypothesis.strategies._internal.utils import defines_strategy
56from hypothesis.utils.conventions import UniqueIdentifier
57
58if TYPE_CHECKING:
59 from typing import TypeAlias
60
61 Ex = TypeVar("Ex", covariant=True, default=Any)
62else:
63 Ex = TypeVar("Ex", covariant=True)
64
65T = TypeVar("T")
66T3 = TypeVar("T3")
67T4 = TypeVar("T4")
68T5 = TypeVar("T5")
69MappedFrom = TypeVar("MappedFrom")
70MappedTo = TypeVar("MappedTo")
71RecurT: "TypeAlias" = Callable[["SearchStrategy"], bool]
72calculating = UniqueIdentifier("calculating")
73
74MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name(
75 "another attempted draw in MappedStrategy"
76)
77
78FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name(
79 "single loop iteration in FilteredStrategy"
80)
81
82label_lock = RLock()
83
84
85def recursive_property(strategy: "SearchStrategy", name: str, default: object) -> Any:
86 """Handle properties which may be mutually recursive among a set of
87 strategies.
88
89 These are essentially lazily cached properties, with the ability to set
90 an override: If the property has not been explicitly set, we calculate
91 it on first access and memoize the result for later.
92
93 The problem is that for properties that depend on each other, a naive
94 calculation strategy may hit infinite recursion. Consider for example
95 the property is_empty. A strategy defined as x = st.deferred(lambda: x)
96 is certainly empty (in order to draw a value from x we would have to
97 draw a value from x, for which we would have to draw a value from x,
98 ...), but in order to calculate it the naive approach would end up
99 calling x.is_empty in order to calculate x.is_empty in order to etc.
100
101 The solution is one of fixed point calculation. We start with a default
102 value that is the value of the property in the absence of evidence to
103 the contrary, and then update the values of the property for all
104 dependent strategies until we reach a fixed point.
105
106 The approach taken roughly follows that in section 4.2 of Adams,
107 Michael D., Celeste Hollenbeck, and Matthew Might. "On the complexity
108 and performance of parsing with derivatives." ACM SIGPLAN Notices 51.6
109 (2016): 224-236.
110 """
111 assert name in {"is_empty", "has_reusable_values", "is_cacheable"}
112 cache_key = "cached_" + name
113 calculation = "calc_" + name
114 force_key = "force_" + name
115
116 def forced_value(target: SearchStrategy) -> Any:
117 try:
118 return getattr(target, force_key)
119 except AttributeError:
120 return getattr(target, cache_key)
121
122 try:
123 return forced_value(strategy)
124 except AttributeError:
125 pass
126
127 mapping: dict[SearchStrategy, Any] = {}
128 sentinel = object()
129 hit_recursion = False
130
131 # For a first pass we do a direct recursive calculation of the
132 # property, but we block recursively visiting a value in the
133 # computation of its property: When that happens, we simply
134 # note that it happened and return the default value.
135 def recur(strat: SearchStrategy) -> Any:
136 nonlocal hit_recursion
137 try:
138 return forced_value(strat)
139 except AttributeError:
140 pass
141 result = mapping.get(strat, sentinel)
142 if result is calculating:
143 hit_recursion = True
144 return default
145 elif result is sentinel:
146 mapping[strat] = calculating
147 mapping[strat] = getattr(strat, calculation)(recur)
148 return mapping[strat]
149 return result
150
151 recur(strategy)
152
153 # If we hit self-recursion in the computation of any strategy
154 # value, our mapping at the end is imprecise - it may or may
155 # not have the right values in it. We now need to proceed with
156 # a more careful fixed point calculation to get the exact
157 # values. Hopefully our mapping is still pretty good and it
158 # won't take a large number of updates to reach a fixed point.
159 if hit_recursion:
160 needs_update = set(mapping)
161
162 # We track which strategies use which in the course of
163 # calculating their property value. If A ever uses B in
164 # the course of calculating its value, then whenever the
165 # value of B changes we might need to update the value of
166 # A.
167 listeners: dict[SearchStrategy, set[SearchStrategy]] = defaultdict(set)
168 else:
169 needs_update = None
170
171 def recur2(strat: SearchStrategy) -> Any:
172 def recur_inner(other: SearchStrategy) -> Any:
173 try:
174 return forced_value(other)
175 except AttributeError:
176 pass
177 listeners[other].add(strat)
178 result = mapping.get(other, sentinel)
179 if result is sentinel:
180 assert needs_update is not None
181 needs_update.add(other)
182 mapping[other] = default
183 return default
184 return result
185
186 return recur_inner
187
188 count = 0
189 seen = set()
190 while needs_update:
191 count += 1
192 # If we seem to be taking a really long time to stabilize we
193 # start tracking seen values to attempt to detect an infinite
194 # loop. This should be impossible, and most code will never
195 # hit the count, but having an assertion for it means that
196 # testing is easier to debug and we don't just have a hung
197 # test.
198 # Note: This is actually covered, by test_very_deep_deferral
199 # in tests/cover/test_deferred_strategies.py. Unfortunately it
200 # runs into a coverage bug. See
201 # https://github.com/nedbat/coveragepy/issues/605
202 # for details.
203 if count > 50: # pragma: no cover
204 key = frozenset(mapping.items())
205 assert key not in seen, (key, name)
206 seen.add(key)
207 to_update = needs_update
208 needs_update = set()
209 for strat in to_update:
210 new_value = getattr(strat, calculation)(recur2(strat))
211 if new_value != mapping[strat]:
212 needs_update.update(listeners[strat])
213 mapping[strat] = new_value
214
215 # We now have a complete and accurate calculation of the
216 # property values for everything we have seen in the course of
217 # running this calculation. We simultaneously update all of
218 # them (not just the strategy we started out with).
219 for k, v in mapping.items():
220 setattr(k, cache_key, v)
221 return getattr(strategy, cache_key)
222
223
224class SearchStrategy(Generic[Ex]):
225 """A ``SearchStrategy`` tells Hypothesis how to generate that kind of input.
226
227 This class is only part of the public API for use in type annotations, so that
228 you can write e.g. ``-> SearchStrategy[Foo]`` for your function which returns
229 ``builds(Foo, ...)``. Do not inherit from or directly instantiate this class.
230 """
231
232 __module__: str = "hypothesis.strategies"
233 LABELS: ClassVar[dict[type, int]] = {}
234 # triggers `assert isinstance(label, int)` under threading when setting this
235 # in init instead of a classvar. I'm not sure why, init should be safe. But
236 # this works so I'm not looking into it further atm.
237 __label: Union[int, UniqueIdentifier, None] = None
238
239 def __init__(self):
240 self.validate_called: dict[int, bool] = {}
241
242 def _available(self, data: ConjectureData) -> bool:
243 """Returns whether this strategy can *currently* draw any
244 values. This typically useful for stateful testing where ``Bundle``
245 grows over time a list of value to choose from.
246
247 Unlike ``empty`` property, this method's return value may change
248 over time.
249 Note: ``data`` parameter will only be used for introspection and no
250 value drawn from it.
251 """
252 return not self.is_empty
253
254 @property
255 def is_empty(self) -> Any:
256 # Returns True if this strategy can never draw a value and will always
257 # result in the data being marked invalid.
258 # The fact that this returns False does not guarantee that a valid value
259 # can be drawn - this is not intended to be perfect, and is primarily
260 # intended to be an optimisation for some cases.
261 return recursive_property(self, "is_empty", True)
262
263 @property
264 def supports_find(self) -> bool:
265 return True
266
267 # Returns True if values from this strategy can safely be reused without
268 # this causing unexpected behaviour.
269
270 # True if values from this strategy can be implicitly reused (e.g. as
271 # background values in a numpy array) without causing surprising
272 # user-visible behaviour. Should be false for built-in strategies that
273 # produce mutable values, and for strategies that have been mapped/filtered
274 # by arbitrary user-provided functions.
275 @property
276 def has_reusable_values(self) -> Any:
277 return recursive_property(self, "has_reusable_values", True)
278
279 # Whether this strategy is suitable for holding onto in a cache.
280 @property
281 def is_cacheable(self) -> Any:
282 return recursive_property(self, "is_cacheable", True)
283
284 def calc_is_cacheable(self, recur: RecurT) -> bool:
285 return True
286
287 def calc_is_empty(self, recur: RecurT) -> bool:
288 # Note: It is correct and significant that the default return value
289 # from calc_is_empty is False despite the default value for is_empty
290 # being true. The reason for this is that strategies should be treated
291 # as empty absent evidence to the contrary, but most basic strategies
292 # are trivially non-empty and it would be annoying to have to override
293 # this method to show that.
294 return False
295
296 def calc_has_reusable_values(self, recur: RecurT) -> bool:
297 return False
298
299 def example(self) -> Ex: # FIXME
300 """Provide an example of the sort of value that this strategy generates.
301
302 This method is designed for use in a REPL, and will raise an error if
303 called from inside |@given| or a strategy definition. For serious use,
304 see |@composite| or |st.data|.
305 """
306 if getattr(sys, "ps1", None) is None: # pragma: no branch
307 # The other branch *is* covered in cover/test_examples.py; but as that
308 # uses `pexpect` for an interactive session `coverage` doesn't see it.
309 warnings.warn(
310 "The `.example()` method is good for exploring strategies, but should "
311 "only be used interactively. We recommend using `@given` for tests - "
312 "it performs better, saves and replays failures to avoid flakiness, "
313 f"and reports minimal examples. (strategy: {self!r})",
314 NonInteractiveExampleWarning,
315 stacklevel=2,
316 )
317
318 context = _current_build_context.value
319 if context is not None:
320 if context.data is not None and context.data.depth > 0:
321 raise HypothesisException(
322 "Using example() inside a strategy definition is a bad "
323 "idea. Instead consider using hypothesis.strategies.builds() "
324 "or @hypothesis.strategies.composite to define your strategy."
325 " See https://hypothesis.readthedocs.io/en/latest/data.html"
326 "#hypothesis.strategies.builds or "
327 "https://hypothesis.readthedocs.io/en/latest/data.html"
328 "#composite-strategies for more details."
329 )
330 else:
331 raise HypothesisException(
332 "Using example() inside a test function is a bad "
333 "idea. Instead consider using hypothesis.strategies.data() "
334 "to draw more examples during testing. See "
335 "https://hypothesis.readthedocs.io/en/latest/data.html"
336 "#drawing-interactively-in-tests for more details."
337 )
338
339 try:
340 return self.__examples.pop()
341 except (AttributeError, IndexError):
342 self.__examples: list[Ex] = []
343
344 from hypothesis.core import given
345
346 # Note: this function has a weird name because it might appear in
347 # tracebacks, and we want users to know that they can ignore it.
348 @given(self)
349 @settings(
350 database=None,
351 # generate only a few examples at a time to avoid slow interactivity
352 # for large strategies. The overhead of @given is very small relative
353 # to generation, so a small batch size is fine.
354 max_examples=10,
355 deadline=None,
356 verbosity=Verbosity.quiet,
357 phases=(Phase.generate,),
358 suppress_health_check=list(HealthCheck),
359 )
360 def example_generating_inner_function(
361 ex: Ex, # type: ignore # mypy is overzealous in preventing covariant params
362 ) -> None:
363 self.__examples.append(ex)
364
365 example_generating_inner_function()
366 shuffle(self.__examples)
367 return self.__examples.pop()
368
369 def map(self, pack: Callable[[Ex], T]) -> "SearchStrategy[T]":
370 """Returns a new strategy which generates a value from this one, and
371 then returns ``pack(value)``. For example, ``integers().map(str)``
372 could generate ``str(5)`` == ``"5"``.
373 """
374 if is_identity_function(pack):
375 return self # type: ignore # Mypy has no way to know that `Ex == T`
376 return MappedStrategy(self, pack=pack)
377
378 def flatmap(
379 self, expand: Callable[[Ex], "SearchStrategy[T]"]
380 ) -> "SearchStrategy[T]": # FIXME
381 """Old syntax for a special case of |@composite|:
382
383 .. code-block:: python
384
385 @st.composite
386 def flatmap_like(draw, base_strategy, expand):
387 value = draw(base_strategy)
388 new_strategy = expand(value)
389 return draw(new_strategy)
390
391 We find that the greater readability of |@composite| usually outweighs
392 the verbosity, with a few exceptions for simple cases or recipes like
393 ``from_type(type).flatmap(from_type)`` ("pick a type, get a strategy for
394 any instance of that type, and then generate one of those").
395 """
396 from hypothesis.strategies._internal.flatmapped import FlatMapStrategy
397
398 return FlatMapStrategy(self, expand=expand)
399
400 # Note that we previously had condition extracted to a type alias as
401 # PredicateT. However, that was only useful when not specifying a relationship
402 # between the generic Ts and some other function param / return value.
403 # If we do want to - like here, where we want to say that the Ex arg to condition
404 # is of the same type as the strategy's Ex - then you need to write out the
405 # entire Callable[[Ex], Any] expression rather than use a type alias.
406 # TypeAlias is *not* simply a macro that inserts the text. TypeAlias will not
407 # reference the local TypeVar context.
408 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]":
409 """Returns a new strategy that generates values from this strategy
410 which satisfy the provided condition.
411
412 Note that if the condition is too hard to satisfy this might result
413 in your tests failing with an Unsatisfiable exception.
414 A basic version of the filtering logic would look something like:
415
416 .. code-block:: python
417
418 @st.composite
419 def filter_like(draw, strategy, condition):
420 for _ in range(3):
421 value = draw(strategy)
422 if condition(value):
423 return value
424 assume(False)
425 """
426 return FilteredStrategy(conditions=(condition,), strategy=self)
427
428 def _filter_for_filtered_draw(
429 self, condition: Callable[[Ex], Any]
430 ) -> "FilteredStrategy[Ex]":
431 # Hook for parent strategies that want to perform fallible filtering
432 # on one of their internal strategies (e.g. UniqueListStrategy).
433 # The returned object must have a `.do_filtered_draw(data)` method
434 # that behaves like `do_draw`, but returns the sentinel object
435 # `filter_not_satisfied` if the condition could not be satisfied.
436
437 # This is separate from the main `filter` method so that strategies
438 # can override `filter` without having to also guarantee a
439 # `do_filtered_draw` method.
440 return FilteredStrategy(conditions=(condition,), strategy=self)
441
442 @property
443 def branches(self) -> Sequence["SearchStrategy[Ex]"]:
444 return [self]
445
446 def __or__(self, other: "SearchStrategy[T]") -> "SearchStrategy[Union[Ex, T]]":
447 """Return a strategy which produces values by randomly drawing from one
448 of this strategy or the other strategy.
449
450 This method is part of the public API.
451 """
452 if not isinstance(other, SearchStrategy):
453 raise ValueError(f"Cannot | a SearchStrategy with {other!r}")
454
455 # Unwrap explicitly or'd strategies. This turns the
456 # common case of e.g. st.integers() | st.integers() | st.integers() from
457 #
458 # one_of(one_of(integers(), integers()), integers())
459 #
460 # into
461 #
462 # one_of(integers(), integers(), integers())
463 #
464 # This is purely an aesthetic unwrapping, for e.g. reprs. In practice
465 # we use .branches / .element_strategies to get the list of possible
466 # strategies, so this unwrapping is *not* necessary for correctness.
467 strategies: list[SearchStrategy] = []
468 strategies.extend(
469 self.original_strategies if isinstance(self, OneOfStrategy) else [self]
470 )
471 strategies.extend(
472 other.original_strategies if isinstance(other, OneOfStrategy) else [other]
473 )
474 return OneOfStrategy(strategies)
475
476 def __bool__(self) -> bool:
477 warnings.warn(
478 f"bool({self!r}) is always True, did you mean to draw a value?",
479 HypothesisWarning,
480 stacklevel=2,
481 )
482 return True
483
484 def validate(self) -> None:
485 """Throw an exception if the strategy is not valid.
486
487 Strategies should implement ``do_validate``, which is called by this
488 method. They should not override ``validate``.
489
490 This can happen due to invalid arguments, or lazy construction.
491 """
492 thread_id = threading.get_ident()
493 if self.validate_called.get(thread_id, False):
494 return
495 # we need to set validate_called before calling do_validate, for
496 # recursive / deferred strategies. But if a thread switches after
497 # validate_called but before do_validate, we might have a strategy
498 # which does weird things like drawing when do_validate would error but
499 # its params are technically valid (e.g. a param was passed as 1.0
500 # instead of 1) and get into weird internal states.
501 #
502 # There are two ways to fix this.
503 # (1) The first is a per-strategy lock around do_validate. Even though we
504 # expect near-zero lock contention, this still adds the lock overhead.
505 # (2) The second is allowing concurrent .validate calls. Since validation
506 # is (assumed to be) deterministic, both threads will produce the same
507 # end state, so the validation order or race conditions does not matter.
508 #
509 # In order to avoid the lock overhead of (1), we use (2) here. See also
510 # discussion in https://github.com/HypothesisWorks/hypothesis/pull/4473.
511 try:
512 self.validate_called[thread_id] = True
513 self.do_validate()
514 self.is_empty
515 self.has_reusable_values
516 except Exception:
517 self.validate_called[thread_id] = False
518 raise
519
520 @property
521 def class_label(self) -> int:
522 cls = self.__class__
523 try:
524 return cls.LABELS[cls]
525 except KeyError:
526 pass
527 result = calc_label_from_cls(cls)
528 cls.LABELS[cls] = result
529 return result
530
531 @property
532 def label(self) -> int:
533 if isinstance((label := self.__label), int):
534 # avoid locking if we've already completely computed the label.
535 return label
536
537 with label_lock:
538 if self.__label is calculating:
539 return 0
540 self.__label = calculating
541 self.__label = self.calc_label()
542 return self.__label
543
544 def calc_label(self) -> int:
545 return self.class_label
546
547 def do_validate(self) -> None:
548 pass
549
550 def do_draw(self, data: ConjectureData) -> Ex:
551 raise NotImplementedError(f"{type(self).__name__}.do_draw")
552
553
554def _is_hashable(value: object) -> tuple[bool, Optional[int]]:
555 # hashing can be expensive; return the hash value if we compute it, so that
556 # callers don't have to recompute.
557 try:
558 return (True, hash(value))
559 except TypeError:
560 return (False, None)
561
562
563def is_hashable(value: object) -> bool:
564 return _is_hashable(value)[0]
565
566
567class SampledFromStrategy(SearchStrategy[Ex]):
568 """A strategy which samples from a set of elements. This is essentially
569 equivalent to using a OneOfStrategy over Just strategies but may be more
570 efficient and convenient.
571 """
572
573 _MAX_FILTER_CALLS: ClassVar[int] = 10_000
574
575 def __init__(
576 self,
577 elements: Sequence[Ex],
578 *,
579 force_repr: Optional[str] = None,
580 force_repr_braces: Optional[tuple[str, str]] = None,
581 transformations: tuple[
582 tuple[Literal["filter", "map"], Callable[[Ex], Any]],
583 ...,
584 ] = (),
585 ):
586 super().__init__()
587 self.elements = cu.check_sample(elements, "sampled_from")
588 assert self.elements
589 self.force_repr = force_repr
590 self.force_repr_braces = force_repr_braces
591 self._transformations = transformations
592
593 self._cached_repr: Optional[str] = None
594
595 def map(self, pack: Callable[[Ex], T]) -> SearchStrategy[T]:
596 s = type(self)(
597 self.elements,
598 force_repr=self.force_repr,
599 force_repr_braces=self.force_repr_braces,
600 transformations=(*self._transformations, ("map", pack)),
601 )
602 # guaranteed by the ("map", pack) transformation
603 return cast(SearchStrategy[T], s)
604
605 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]:
606 return type(self)(
607 self.elements,
608 force_repr=self.force_repr,
609 force_repr_braces=self.force_repr_braces,
610 transformations=(*self._transformations, ("filter", condition)),
611 )
612
613 def __repr__(self):
614 if self._cached_repr is None:
615 rep = get_pretty_function_description
616 elements_s = (
617 ", ".join(rep(v) for v in self.elements[:512]) + ", ..."
618 if len(self.elements) > 512
619 else ", ".join(rep(v) for v in self.elements)
620 )
621 braces = self.force_repr_braces or ("(", ")")
622 instance_s = (
623 self.force_repr or f"sampled_from({braces[0]}{elements_s}{braces[1]})"
624 )
625 transforms_s = "".join(
626 f".{name}({get_pretty_function_description(f)})"
627 for name, f in self._transformations
628 )
629 repr_s = instance_s + transforms_s
630 self._cached_repr = repr_s
631 return self._cached_repr
632
633 def calc_label(self) -> int:
634 # strategy.label is effectively an under-approximation of structural
635 # equality (i.e., some strategies may have the same label when they are not
636 # structurally identical). More importantly for calculating the
637 # SampledFromStrategy label, we might have hash(s1) != hash(s2) even
638 # when s1 and s2 are structurally identical. For instance:
639 #
640 # s1 = st.sampled_from([st.none()])
641 # s2 = st.sampled_from([st.none()])
642 # assert hash(s1) != hash(s2)
643 #
644 # (see also test cases in test_labels.py).
645 #
646 # We therefore use the labels of any component strategies when calculating
647 # our label, and only use the hash if it is not a strategy.
648 #
649 # That's the ideal, anyway. In reality the logic is more complicated than
650 # necessary in order to be efficient in the presence of (very) large sequences:
651 # * add an unabashed special case for range, to avoid iteration over an
652 # enormous range when we know it is entirely integers.
653 # * if there is at least one strategy in self.elements, use strategy label,
654 # and the element hash otherwise.
655 # * if there are no strategies in self.elements, take the hash of the
656 # entire sequence. This prevents worst-case performance of hashing each
657 # element when a hash of the entire sequence would have sufficed.
658 #
659 # The worst case performance of this scheme is
660 # itertools.chain(range(2**100), [st.none()]), where it degrades to
661 # hashing every int in the range.
662 (elements_is_hashable, hash_value) = _is_hashable(self.elements)
663 if isinstance(self.elements, range) or (
664 elements_is_hashable
665 and not any(isinstance(e, SearchStrategy) for e in self.elements)
666 ):
667 return combine_labels(
668 self.class_label, calc_label_from_name(str(hash_value))
669 )
670
671 labels = [self.class_label]
672 for element in self.elements:
673 if not is_hashable(element):
674 continue
675
676 labels.append(
677 element.label
678 if isinstance(element, SearchStrategy)
679 else calc_label_from_hash(element)
680 )
681
682 return combine_labels(*labels)
683
684 def calc_has_reusable_values(self, recur: RecurT) -> bool:
685 # Because our custom .map/.filter implementations skip the normal
686 # wrapper strategies (which would automatically return False for us),
687 # we need to manually return False here if any transformations have
688 # been applied.
689 return not self._transformations
690
691 def calc_is_cacheable(self, recur: RecurT) -> bool:
692 return is_hashable(self.elements)
693
694 def _transform(
695 self,
696 # https://github.com/python/mypy/issues/7049, we're not writing `element`
697 # anywhere in the class so this is still type-safe. mypy is being more
698 # conservative than necessary
699 element: Ex, # type: ignore
700 ) -> Union[Ex, UniqueIdentifier]:
701 # Used in UniqueSampledListStrategy
702 for name, f in self._transformations:
703 if name == "map":
704 result = f(element)
705 if build_context := _current_build_context.value:
706 build_context.record_call(result, f, [element], {})
707 element = result
708 else:
709 assert name == "filter"
710 if not f(element):
711 return filter_not_satisfied
712 return element
713
714 def do_draw(self, data: ConjectureData) -> Ex:
715 result = self.do_filtered_draw(data)
716 if isinstance(result, SearchStrategy) and all(
717 isinstance(x, SearchStrategy) for x in self.elements
718 ):
719 data._sampled_from_all_strategies_elements_message = (
720 "sample_from was given a collection of strategies: "
721 "{!r}. Was one_of intended?",
722 self.elements,
723 )
724 if result is filter_not_satisfied:
725 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}")
726 assert not isinstance(result, UniqueIdentifier)
727 return result
728
729 def get_element(self, i: int) -> Union[Ex, UniqueIdentifier]:
730 return self._transform(self.elements[i])
731
732 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]:
733 # Set of indices that have been tried so far, so that we never test
734 # the same element twice during a draw.
735 known_bad_indices: set[int] = set()
736
737 # Start with ordinary rejection sampling. It's fast if it works, and
738 # if it doesn't work then it was only a small amount of overhead.
739 for _ in range(3):
740 i = data.draw_integer(0, len(self.elements) - 1)
741 if i not in known_bad_indices:
742 element = self.get_element(i)
743 if element is not filter_not_satisfied:
744 return element
745 if not known_bad_indices:
746 data.events[f"Retried draw from {self!r} to satisfy filter"] = ""
747 known_bad_indices.add(i)
748
749 # If we've tried all the possible elements, give up now.
750 max_good_indices = len(self.elements) - len(known_bad_indices)
751 if not max_good_indices:
752 return filter_not_satisfied
753
754 # Impose an arbitrary cutoff to prevent us from wasting too much time
755 # on very large element lists.
756 max_good_indices = min(max_good_indices, self._MAX_FILTER_CALLS - 3)
757
758 # Before building the list of allowed indices, speculatively choose
759 # one of them. We don't yet know how many allowed indices there will be,
760 # so this choice might be out-of-bounds, but that's OK.
761 speculative_index = data.draw_integer(0, max_good_indices - 1)
762
763 # Calculate the indices of allowed values, so that we can choose one
764 # of them at random. But if we encounter the speculatively-chosen one,
765 # just use that and return immediately. Note that we also track the
766 # allowed elements, in case of .map(some_stateful_function)
767 allowed: list[tuple[int, Ex]] = []
768 for i in range(min(len(self.elements), self._MAX_FILTER_CALLS - 3)):
769 if i not in known_bad_indices:
770 element = self.get_element(i)
771 if element is not filter_not_satisfied:
772 assert not isinstance(element, UniqueIdentifier)
773 allowed.append((i, element))
774 if len(allowed) > speculative_index:
775 # Early-exit case: We reached the speculative index, so
776 # we just return the corresponding element.
777 data.draw_integer(0, len(self.elements) - 1, forced=i)
778 return element
779
780 # The speculative index didn't work out, but at this point we've built
781 # and can choose from the complete list of allowed indices and elements.
782 if allowed:
783 i, element = data.choice(allowed)
784 data.draw_integer(0, len(self.elements) - 1, forced=i)
785 return element
786 # If there are no allowed indices, the filter couldn't be satisfied.
787 return filter_not_satisfied
788
789
790class OneOfStrategy(SearchStrategy[Ex]):
791 """Implements a union of strategies. Given a number of strategies this
792 generates values which could have come from any of them.
793
794 The conditional distribution draws uniformly at random from some
795 non-empty subset of these strategies and then draws from the
796 conditional distribution of that strategy.
797 """
798
799 def __init__(self, strategies: Sequence[SearchStrategy[Ex]]):
800 super().__init__()
801 self.original_strategies = tuple(strategies)
802 self.__element_strategies: Optional[Sequence[SearchStrategy[Ex]]] = None
803 self.__in_branches = False
804 self._branches_lock = RLock()
805
806 def calc_is_empty(self, recur: RecurT) -> bool:
807 return all(recur(e) for e in self.original_strategies)
808
809 def calc_has_reusable_values(self, recur: RecurT) -> bool:
810 return all(recur(e) for e in self.original_strategies)
811
812 def calc_is_cacheable(self, recur: RecurT) -> bool:
813 return all(recur(e) for e in self.original_strategies)
814
815 @property
816 def element_strategies(self) -> Sequence[SearchStrategy[Ex]]:
817 if self.__element_strategies is None:
818 # While strategies are hashable, they use object.__hash__ and are
819 # therefore distinguished only by identity.
820 #
821 # In principle we could "just" define a __hash__ method
822 # (and __eq__, but that's easy in terms of type() and hash())
823 # to make this more powerful, but this is harder than it sounds:
824 #
825 # 1. Strategies are often distinguished by non-hashable attributes,
826 # or by attributes that have the same hash value ("^.+" / b"^.+").
827 # 2. LazyStrategy: can't reify the wrapped strategy without breaking
828 # laziness, so there's a hash each for the lazy and the nonlazy.
829 #
830 # Having made several attempts, the minor benefits of making strategies
831 # hashable are simply not worth the engineering effort it would take.
832 # See also issues #2291 and #2327.
833 seen: set[SearchStrategy] = {self}
834 strategies: list[SearchStrategy] = []
835 for arg in self.original_strategies:
836 check_strategy(arg)
837 if not arg.is_empty:
838 for s in arg.branches:
839 if s not in seen and not s.is_empty:
840 seen.add(s)
841 strategies.append(s)
842 self.__element_strategies = strategies
843 return self.__element_strategies
844
845 def calc_label(self) -> int:
846 return combine_labels(
847 self.class_label, *(p.label for p in self.original_strategies)
848 )
849
850 def do_draw(self, data: ConjectureData) -> Ex:
851 strategy = data.draw(
852 SampledFromStrategy(self.element_strategies).filter(
853 lambda s: s._available(data)
854 )
855 )
856 return data.draw(strategy)
857
858 def __repr__(self) -> str:
859 return "one_of({})".format(", ".join(map(repr, self.original_strategies)))
860
861 def do_validate(self) -> None:
862 for e in self.element_strategies:
863 e.validate()
864
865 @property
866 def branches(self) -> Sequence[SearchStrategy[Ex]]:
867 if self.__element_strategies is not None:
868 # common fast path which avoids the lock
869 return self.element_strategies
870
871 with self._branches_lock:
872 if not self.__in_branches:
873 try:
874 self.__in_branches = True
875 return self.element_strategies
876 finally:
877 self.__in_branches = False
878 else:
879 return [self]
880
881 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]:
882 return FilteredStrategy(
883 OneOfStrategy([s.filter(condition) for s in self.original_strategies]),
884 conditions=(),
885 )
886
887
888@overload
889def one_of(
890 __args: Sequence[SearchStrategy[Ex]],
891) -> SearchStrategy[Ex]: # pragma: no cover
892 ...
893
894
895@overload
896def one_of(__a1: SearchStrategy[Ex]) -> SearchStrategy[Ex]: # pragma: no cover
897 ...
898
899
900@overload
901def one_of(
902 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T]
903) -> SearchStrategy[Union[Ex, T]]: # pragma: no cover
904 ...
905
906
907@overload
908def one_of(
909 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T], __a3: SearchStrategy[T3]
910) -> SearchStrategy[Union[Ex, T, T3]]: # pragma: no cover
911 ...
912
913
914@overload
915def one_of(
916 __a1: SearchStrategy[Ex],
917 __a2: SearchStrategy[T],
918 __a3: SearchStrategy[T3],
919 __a4: SearchStrategy[T4],
920) -> SearchStrategy[Union[Ex, T, T3, T4]]: # pragma: no cover
921 ...
922
923
924@overload
925def one_of(
926 __a1: SearchStrategy[Ex],
927 __a2: SearchStrategy[T],
928 __a3: SearchStrategy[T3],
929 __a4: SearchStrategy[T4],
930 __a5: SearchStrategy[T5],
931) -> SearchStrategy[Union[Ex, T, T3, T4, T5]]: # pragma: no cover
932 ...
933
934
935@overload
936def one_of(*args: SearchStrategy[Any]) -> SearchStrategy[Any]: # pragma: no cover
937 ...
938
939
940@defines_strategy(never_lazy=True)
941def one_of(
942 *args: Union[Sequence[SearchStrategy[Any]], SearchStrategy[Any]]
943) -> SearchStrategy[Any]:
944 # Mypy workaround alert: Any is too loose above; the return parameter
945 # should be the union of the input parameters. Unfortunately, Mypy <=0.600
946 # raises errors due to incompatible inputs instead. See #1270 for links.
947 # v0.610 doesn't error; it gets inference wrong for 2+ arguments instead.
948 """Return a strategy which generates values from any of the argument
949 strategies.
950
951 This may be called with one iterable argument instead of multiple
952 strategy arguments, in which case ``one_of(x)`` and ``one_of(*x)`` are
953 equivalent.
954
955 Examples from this strategy will generally shrink to ones that come from
956 strategies earlier in the list, then shrink according to behaviour of the
957 strategy that produced them. In order to get good shrinking behaviour,
958 try to put simpler strategies first. e.g. ``one_of(none(), text())`` is
959 better than ``one_of(text(), none())``.
960
961 This is especially important when using recursive strategies. e.g.
962 ``x = st.deferred(lambda: st.none() | st.tuples(x, x))`` will shrink well,
963 but ``x = st.deferred(lambda: st.tuples(x, x) | st.none())`` will shrink
964 very badly indeed.
965 """
966 if len(args) == 1 and not isinstance(args[0], SearchStrategy):
967 try:
968 args = tuple(args[0])
969 except TypeError:
970 pass
971 if len(args) == 1 and isinstance(args[0], SearchStrategy):
972 # This special-case means that we can one_of over lists of any size
973 # without incurring any performance overhead when there is only one
974 # strategy, and keeps our reprs simple.
975 return args[0]
976 if args and not any(isinstance(a, SearchStrategy) for a in args):
977 # And this special case is to give a more-specific error message if it
978 # seems that the user has confused `one_of()` for `sampled_from()`;
979 # the remaining validation is left to OneOfStrategy. See PR #2627.
980 raise InvalidArgument(
981 f"Did you mean st.sampled_from({list(args)!r})? st.one_of() is used "
982 "to combine strategies, but all of the arguments were of other types."
983 )
984 # we've handled the case where args is a one-element sequence [(s1, s2, ...)]
985 # above, so we can assume it's an actual sequence of strategies.
986 args = cast(Sequence[SearchStrategy], args)
987 return OneOfStrategy(args)
988
989
990class MappedStrategy(SearchStrategy[MappedTo], Generic[MappedFrom, MappedTo]):
991 """A strategy which is defined purely by conversion to and from another
992 strategy.
993
994 Its parameter and distribution come from that other strategy.
995 """
996
997 def __init__(
998 self,
999 strategy: SearchStrategy[MappedFrom],
1000 pack: Callable[[MappedFrom], MappedTo],
1001 ) -> None:
1002 super().__init__()
1003 self.mapped_strategy = strategy
1004 self.pack = pack
1005
1006 def calc_is_empty(self, recur: RecurT) -> bool:
1007 return recur(self.mapped_strategy)
1008
1009 def calc_is_cacheable(self, recur: RecurT) -> bool:
1010 return recur(self.mapped_strategy)
1011
1012 def __repr__(self) -> str:
1013 if not hasattr(self, "_cached_repr"):
1014 self._cached_repr = f"{self.mapped_strategy!r}.map({get_pretty_function_description(self.pack)})"
1015 return self._cached_repr
1016
1017 def do_validate(self) -> None:
1018 self.mapped_strategy.validate()
1019
1020 def do_draw(self, data: ConjectureData) -> MappedTo:
1021 with warnings.catch_warnings():
1022 if isinstance(self.pack, type) and issubclass(
1023 self.pack, (abc.Mapping, abc.Set)
1024 ):
1025 warnings.simplefilter("ignore", BytesWarning)
1026 for _ in range(3):
1027 try:
1028 data.start_span(MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL)
1029 x = data.draw(self.mapped_strategy)
1030 result = self.pack(x)
1031 data.stop_span()
1032 current_build_context().record_call(result, self.pack, [x], {})
1033 return result
1034 except UnsatisfiedAssumption:
1035 data.stop_span(discard=True)
1036 raise UnsatisfiedAssumption
1037
1038 @property
1039 def branches(self) -> Sequence[SearchStrategy[MappedTo]]:
1040 return [
1041 MappedStrategy(strategy, pack=self.pack)
1042 for strategy in self.mapped_strategy.branches
1043 ]
1044
1045 def filter(
1046 self, condition: Callable[[MappedTo], Any]
1047 ) -> "SearchStrategy[MappedTo]":
1048 # Includes a special case so that we can rewrite filters on collection
1049 # lengths, when most collections are `st.lists(...).map(the_type)`.
1050 ListStrategy = _list_strategy_type()
1051 if not isinstance(self.mapped_strategy, ListStrategy) or not (
1052 (isinstance(self.pack, type) and issubclass(self.pack, abc.Collection))
1053 or self.pack in _collection_ish_functions()
1054 ):
1055 return super().filter(condition)
1056
1057 # Check whether our inner list strategy can rewrite this filter condition.
1058 # If not, discard the result and _only_ apply a new outer filter.
1059 new = ListStrategy.filter(self.mapped_strategy, condition)
1060 if getattr(new, "filtered_strategy", None) is self.mapped_strategy:
1061 return super().filter(condition) # didn't rewrite
1062
1063 # Apply a new outer filter even though we rewrote the inner strategy,
1064 # because some collections can change the list length (dict, set, etc).
1065 return FilteredStrategy(type(self)(new, self.pack), conditions=(condition,))
1066
1067
1068@lru_cache
1069def _list_strategy_type() -> Any:
1070 from hypothesis.strategies._internal.collections import ListStrategy
1071
1072 return ListStrategy
1073
1074
1075def _collection_ish_functions() -> Sequence[Any]:
1076 funcs = [sorted]
1077 if np := sys.modules.get("numpy"):
1078 # c.f. https://numpy.org/doc/stable/reference/routines.array-creation.html
1079 # Probably only `np.array` and `np.asarray` will be used in practice,
1080 # but why should that stop us when we've already gone this far?
1081 funcs += [
1082 np.empty_like,
1083 np.eye,
1084 np.identity,
1085 np.ones_like,
1086 np.zeros_like,
1087 np.array,
1088 np.asarray,
1089 np.asanyarray,
1090 np.ascontiguousarray,
1091 np.asmatrix,
1092 np.copy,
1093 np.rec.array,
1094 np.rec.fromarrays,
1095 np.rec.fromrecords,
1096 np.diag,
1097 # bonus undocumented functions from tab-completion:
1098 np.asarray_chkfinite,
1099 np.asfortranarray,
1100 ]
1101
1102 return funcs
1103
1104
1105filter_not_satisfied = UniqueIdentifier("filter not satisfied")
1106
1107
1108class FilteredStrategy(SearchStrategy[Ex]):
1109 def __init__(
1110 self, strategy: SearchStrategy[Ex], conditions: tuple[Callable[[Ex], Any], ...]
1111 ):
1112 super().__init__()
1113 if isinstance(strategy, FilteredStrategy):
1114 # Flatten chained filters into a single filter with multiple conditions.
1115 self.flat_conditions: tuple[Callable[[Ex], Any], ...] = (
1116 strategy.flat_conditions + conditions
1117 )
1118 self.filtered_strategy: SearchStrategy[Ex] = strategy.filtered_strategy
1119 else:
1120 self.flat_conditions = conditions
1121 self.filtered_strategy = strategy
1122
1123 assert isinstance(self.flat_conditions, tuple)
1124 assert not isinstance(self.filtered_strategy, FilteredStrategy)
1125
1126 self.__condition: Optional[Callable[[Ex], Any]] = None
1127
1128 def calc_is_empty(self, recur: RecurT) -> bool:
1129 return recur(self.filtered_strategy)
1130
1131 def calc_is_cacheable(self, recur: RecurT) -> bool:
1132 return recur(self.filtered_strategy)
1133
1134 def __repr__(self) -> str:
1135 if not hasattr(self, "_cached_repr"):
1136 self._cached_repr = "{!r}{}".format(
1137 self.filtered_strategy,
1138 "".join(
1139 f".filter({get_pretty_function_description(cond)})"
1140 for cond in self.flat_conditions
1141 ),
1142 )
1143 return self._cached_repr
1144
1145 def do_validate(self) -> None:
1146 # Start by validating our inner filtered_strategy. If this was a LazyStrategy,
1147 # validation also reifies it so that subsequent calls to e.g. `.filter()` will
1148 # be passed through.
1149 self.filtered_strategy.validate()
1150 # So now we have a reified inner strategy, we'll replay all our saved
1151 # predicates in case some or all of them can be rewritten. Note that this
1152 # replaces the `fresh` strategy too!
1153 fresh = self.filtered_strategy
1154 for cond in self.flat_conditions:
1155 fresh = fresh.filter(cond)
1156 if isinstance(fresh, FilteredStrategy):
1157 # In this case we have at least some non-rewritten filter predicates,
1158 # so we just re-initialize the strategy.
1159 FilteredStrategy.__init__(
1160 self, fresh.filtered_strategy, fresh.flat_conditions
1161 )
1162 else:
1163 # But if *all* the predicates were rewritten... well, do_validate() is
1164 # an in-place method so we still just re-initialize the strategy!
1165 FilteredStrategy.__init__(self, fresh, ())
1166
1167 def filter(self, condition: Callable[[Ex], Any]) -> "FilteredStrategy[Ex]":
1168 # If we can, it's more efficient to rewrite our strategy to satisfy the
1169 # condition. We therefore exploit the fact that the order of predicates
1170 # doesn't matter (`f(x) and g(x) == g(x) and f(x)`) by attempting to apply
1171 # condition directly to our filtered strategy as the inner-most filter.
1172 out = self.filtered_strategy.filter(condition)
1173 # If it couldn't be rewritten, we'll get a new FilteredStrategy - and then
1174 # combine the conditions of each in our expected newest=last order.
1175 if isinstance(out, FilteredStrategy):
1176 return FilteredStrategy(
1177 out.filtered_strategy, self.flat_conditions + out.flat_conditions
1178 )
1179 # But if it *could* be rewritten, we can return the more efficient form!
1180 return FilteredStrategy(out, self.flat_conditions)
1181
1182 @property
1183 def condition(self) -> Callable[[Ex], Any]:
1184 if self.__condition is None:
1185 if len(self.flat_conditions) == 1:
1186 # Avoid an extra indirection in the common case of only one condition.
1187 self.__condition = self.flat_conditions[0]
1188 elif len(self.flat_conditions) == 0:
1189 # Possible, if unlikely, due to filter predicate rewriting
1190 self.__condition = lambda _: True # type: ignore # covariant type param
1191 else:
1192 self.__condition = lambda x: all( # type: ignore # covariant type param
1193 cond(x) for cond in self.flat_conditions
1194 )
1195 return self.__condition
1196
1197 def do_draw(self, data: ConjectureData) -> Ex:
1198 result = self.do_filtered_draw(data)
1199 if result is not filter_not_satisfied:
1200 return cast(Ex, result)
1201
1202 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}")
1203
1204 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]:
1205 for i in range(3):
1206 data.start_span(FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL)
1207 value = data.draw(self.filtered_strategy)
1208 if self.condition(value):
1209 data.stop_span()
1210 return value
1211 else:
1212 data.stop_span(discard=True)
1213 if i == 0:
1214 data.events[f"Retried draw from {self!r} to satisfy filter"] = ""
1215
1216 return filter_not_satisfied
1217
1218 @property
1219 def branches(self) -> Sequence[SearchStrategy[Ex]]:
1220 return [
1221 FilteredStrategy(strategy=strategy, conditions=self.flat_conditions)
1222 for strategy in self.filtered_strategy.branches
1223 ]
1224
1225
1226@check_function
1227def check_strategy(arg: object, name: str = "") -> None:
1228 assert isinstance(name, str)
1229 if not isinstance(arg, SearchStrategy):
1230 hint = ""
1231 if isinstance(arg, (list, tuple)):
1232 hint = ", such as st.sampled_from({}),".format(name or "...")
1233 if name:
1234 name += "="
1235 raise InvalidArgument(
1236 f"Expected a SearchStrategy{hint} but got {name}{arg!r} "
1237 f"(type={type(arg).__name__})"
1238 )