1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import os
12import sys
13import threading
14import warnings
15from collections import abc, defaultdict
16from collections.abc import Callable, Sequence
17from functools import lru_cache
18from random import shuffle
19from threading import RLock
20from typing import (
21 TYPE_CHECKING,
22 Any,
23 ClassVar,
24 Generic,
25 Literal,
26 TypeAlias,
27 TypeGuard,
28 TypeVar,
29 cast,
30 overload,
31)
32
33from hypothesis._settings import HealthCheck, Phase, Verbosity, settings
34from hypothesis.control import _current_build_context, current_build_context
35from hypothesis.errors import (
36 HypothesisException,
37 HypothesisWarning,
38 InvalidArgument,
39 NonInteractiveExampleWarning,
40 UnsatisfiedAssumption,
41)
42from hypothesis.internal.conjecture import utils as cu
43from hypothesis.internal.conjecture.data import ConjectureData
44from hypothesis.internal.conjecture.utils import (
45 calc_label_from_cls,
46 calc_label_from_hash,
47 calc_label_from_name,
48 combine_labels,
49)
50from hypothesis.internal.coverage import check_function
51from hypothesis.internal.reflection import (
52 get_pretty_function_description,
53 is_identity_function,
54)
55from hypothesis.strategies._internal.utils import defines_strategy
56from hypothesis.utils.conventions import UniqueIdentifier
57
58if TYPE_CHECKING:
59 Ex = TypeVar("Ex", covariant=True, default=Any)
60else:
61 Ex = TypeVar("Ex", covariant=True)
62
63T = TypeVar("T")
64T3 = TypeVar("T3")
65T4 = TypeVar("T4")
66T5 = TypeVar("T5")
67MappedFrom = TypeVar("MappedFrom")
68MappedTo = TypeVar("MappedTo")
69RecurT: TypeAlias = Callable[["SearchStrategy"], bool]
70calculating = UniqueIdentifier("calculating")
71
72MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name(
73 "another attempted draw in MappedStrategy"
74)
75
76FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name(
77 "single loop iteration in FilteredStrategy"
78)
79
80label_lock = RLock()
81
82
83def recursive_property(strategy: "SearchStrategy", name: str, default: object) -> Any:
84 """Handle properties which may be mutually recursive among a set of
85 strategies.
86
87 These are essentially lazily cached properties, with the ability to set
88 an override: If the property has not been explicitly set, we calculate
89 it on first access and memoize the result for later.
90
91 The problem is that for properties that depend on each other, a naive
92 calculation strategy may hit infinite recursion. Consider for example
93 the property is_empty. A strategy defined as x = st.deferred(lambda: x)
94 is certainly empty (in order to draw a value from x we would have to
95 draw a value from x, for which we would have to draw a value from x,
96 ...), but in order to calculate it the naive approach would end up
97 calling x.is_empty in order to calculate x.is_empty in order to etc.
98
99 The solution is one of fixed point calculation. We start with a default
100 value that is the value of the property in the absence of evidence to
101 the contrary, and then update the values of the property for all
102 dependent strategies until we reach a fixed point.
103
104 The approach taken roughly follows that in section 4.2 of Adams,
105 Michael D., Celeste Hollenbeck, and Matthew Might. "On the complexity
106 and performance of parsing with derivatives." ACM SIGPLAN Notices 51.6
107 (2016): 224-236.
108 """
109 assert name in {"is_empty", "has_reusable_values", "is_cacheable"}
110 cache_key = "cached_" + name
111 calculation = "calc_" + name
112 force_key = "force_" + name
113
114 def forced_value(target: SearchStrategy) -> Any:
115 try:
116 return getattr(target, force_key)
117 except AttributeError:
118 return getattr(target, cache_key)
119
120 try:
121 return forced_value(strategy)
122 except AttributeError:
123 pass
124
125 mapping: dict[SearchStrategy, Any] = {}
126 sentinel = object()
127 hit_recursion = False
128
129 # For a first pass we do a direct recursive calculation of the
130 # property, but we block recursively visiting a value in the
131 # computation of its property: When that happens, we simply
132 # note that it happened and return the default value.
133 def recur(strat: SearchStrategy) -> Any:
134 nonlocal hit_recursion
135 try:
136 return forced_value(strat)
137 except AttributeError:
138 pass
139 result = mapping.get(strat, sentinel)
140 if result is calculating:
141 hit_recursion = True
142 return default
143 elif result is sentinel:
144 mapping[strat] = calculating
145 mapping[strat] = getattr(strat, calculation)(recur)
146 return mapping[strat]
147 return result
148
149 recur(strategy)
150
151 # If we hit self-recursion in the computation of any strategy
152 # value, our mapping at the end is imprecise - it may or may
153 # not have the right values in it. We now need to proceed with
154 # a more careful fixed point calculation to get the exact
155 # values. Hopefully our mapping is still pretty good and it
156 # won't take a large number of updates to reach a fixed point.
157 if hit_recursion:
158 needs_update = set(mapping)
159
160 # We track which strategies use which in the course of
161 # calculating their property value. If A ever uses B in
162 # the course of calculating its value, then whenever the
163 # value of B changes we might need to update the value of
164 # A.
165 listeners: dict[SearchStrategy, set[SearchStrategy]] = defaultdict(set)
166 else:
167 needs_update = None
168
169 def recur2(strat: SearchStrategy) -> Any:
170 def recur_inner(other: SearchStrategy) -> Any:
171 try:
172 return forced_value(other)
173 except AttributeError:
174 pass
175 listeners[other].add(strat)
176 result = mapping.get(other, sentinel)
177 if result is sentinel:
178 assert needs_update is not None
179 needs_update.add(other)
180 mapping[other] = default
181 return default
182 return result
183
184 return recur_inner
185
186 count = 0
187 seen = set()
188 while needs_update:
189 count += 1
190 # If we seem to be taking a really long time to stabilize we
191 # start tracking seen values to attempt to detect an infinite
192 # loop. This should be impossible, and most code will never
193 # hit the count, but having an assertion for it means that
194 # testing is easier to debug and we don't just have a hung
195 # test.
196 # Note: This is actually covered, by test_very_deep_deferral
197 # in tests/cover/test_deferred_strategies.py. Unfortunately it
198 # runs into a coverage bug. See
199 # https://github.com/nedbat/coveragepy/issues/605
200 # for details.
201 if count > 50: # pragma: no cover
202 key = frozenset(mapping.items())
203 assert key not in seen, (key, name)
204 seen.add(key)
205 to_update = needs_update
206 needs_update = set()
207 for strat in to_update:
208 new_value = getattr(strat, calculation)(recur2(strat))
209 if new_value != mapping[strat]:
210 needs_update.update(listeners[strat])
211 mapping[strat] = new_value
212
213 # We now have a complete and accurate calculation of the
214 # property values for everything we have seen in the course of
215 # running this calculation. We simultaneously update all of
216 # them (not just the strategy we started out with).
217 for k, v in mapping.items():
218 setattr(k, cache_key, v)
219 return getattr(strategy, cache_key)
220
221
222class SearchStrategy(Generic[Ex]):
223 """A ``SearchStrategy`` tells Hypothesis how to generate that kind of input.
224
225 This class is only part of the public API for use in type annotations, so that
226 you can write e.g. ``-> SearchStrategy[Foo]`` for your function which returns
227 ``builds(Foo, ...)``. Do not inherit from or directly instantiate this class.
228 """
229
230 __module__: str = "hypothesis.strategies"
231 LABELS: ClassVar[dict[type, int]] = {}
232 # triggers `assert isinstance(label, int)` under threading when setting this
233 # in init instead of a classvar. I'm not sure why, init should be safe. But
234 # this works so I'm not looking into it further atm.
235 __label: int | UniqueIdentifier | None = None
236
237 def __init__(self):
238 self.validate_called: dict[int, bool] = {}
239
240 def is_currently_empty(self, data: ConjectureData) -> bool:
241 """
242 Returns whether this strategy is currently empty. Unlike ``empty``,
243 which is computed based on static information and cannot change,
244 ``is_currently_empty`` may change over time based on choices made
245 during the test case.
246
247 This is currently only used for stateful testing, where |Bundle| grows a
248 list of values to choose from over the course of a test case.
249
250 ``data`` will only be used for introspection. No values will be drawn
251 from it in a way that modifies the choice sequence.
252 """
253 return self.is_empty
254
255 @property
256 def is_empty(self) -> Any:
257 # Returns True if this strategy can never draw a value and will always
258 # result in the data being marked invalid.
259 # The fact that this returns False does not guarantee that a valid value
260 # can be drawn - this is not intended to be perfect, and is primarily
261 # intended to be an optimisation for some cases.
262 return recursive_property(self, "is_empty", True)
263
264 # Returns True if values from this strategy can safely be reused without
265 # this causing unexpected behaviour.
266
267 # True if values from this strategy can be implicitly reused (e.g. as
268 # background values in a numpy array) without causing surprising
269 # user-visible behaviour. Should be false for built-in strategies that
270 # produce mutable values, and for strategies that have been mapped/filtered
271 # by arbitrary user-provided functions.
272 @property
273 def has_reusable_values(self) -> Any:
274 return recursive_property(self, "has_reusable_values", True)
275
276 @property
277 def is_cacheable(self) -> Any:
278 """
279 Whether it is safe to hold on to instances of this strategy in a cache.
280 See _STRATEGY_CACHE.
281 """
282 return recursive_property(self, "is_cacheable", True)
283
284 def calc_is_cacheable(self, recur: RecurT) -> bool:
285 return True
286
287 def calc_is_empty(self, recur: RecurT) -> bool:
288 # Note: It is correct and significant that the default return value
289 # from calc_is_empty is False despite the default value for is_empty
290 # being true. The reason for this is that strategies should be treated
291 # as empty absent evidence to the contrary, but most basic strategies
292 # are trivially non-empty and it would be annoying to have to override
293 # this method to show that.
294 return False
295
296 def calc_has_reusable_values(self, recur: RecurT) -> bool:
297 return False
298
299 def example(self) -> Ex: # FIXME
300 """Provide an example of the sort of value that this strategy generates.
301
302 This method is designed for use in a REPL, and will raise an error if
303 called from inside |@given| or a strategy definition. For serious use,
304 see |@composite| or |st.data|.
305 """
306 if getattr(sys, "ps1", None) is None and (
307 # The main module's __spec__ is None when running interactively
308 # or running a source file directly.
309 # See https://docs.python.org/3/reference/import.html#main-spec.
310 sys.modules["__main__"].__spec__ is not None
311 # __spec__ is also None under pytest-xdist. To avoid an unfortunate
312 # missed alarm here, always warn under pytest.
313 or os.environ.get("PYTEST_CURRENT_TEST") is not None
314 ): # pragma: no branch
315 # The other branch *is* covered in cover/test_interactive_example.py;
316 # but as that uses `pexpect` for an interactive session `coverage`
317 # doesn't see it.
318 warnings.warn(
319 "The `.example()` method is good for exploring strategies, but should "
320 "only be used interactively. We recommend using `@given` for tests - "
321 "it performs better, saves and replays failures to avoid flakiness, "
322 f"and reports minimal examples. (strategy: {self!r})",
323 NonInteractiveExampleWarning,
324 stacklevel=2,
325 )
326
327 context = _current_build_context.value
328 if context is not None:
329 if context.data is not None and context.data.depth > 0:
330 raise HypothesisException(
331 "Using example() inside a strategy definition is a bad "
332 "idea. Instead consider using hypothesis.strategies.builds() "
333 "or @hypothesis.strategies.composite to define your strategy."
334 " See https://hypothesis.readthedocs.io/en/latest/reference/"
335 "strategies.html#hypothesis.strategies.builds or "
336 "https://hypothesis.readthedocs.io/en/latest/reference/"
337 "strategies.html#hypothesis.strategies.composite for more "
338 "details."
339 )
340 else:
341 raise HypothesisException(
342 "Using example() inside a test function is a bad "
343 "idea. Instead consider using hypothesis.strategies.data() "
344 "to draw more examples during testing. See "
345 "https://hypothesis.readthedocs.io/en/latest/reference/"
346 "strategies.html#hypothesis.strategies.data for more details."
347 )
348
349 try:
350 return self.__examples.pop()
351 except (AttributeError, IndexError):
352 self.__examples: list[Ex] = []
353
354 from hypothesis.core import given
355
356 # Note: this function has a weird name because it might appear in
357 # tracebacks, and we want users to know that they can ignore it.
358 @given(self)
359 @settings(
360 database=None,
361 # generate only a few examples at a time to avoid slow interactivity
362 # for large strategies. The overhead of @given is very small relative
363 # to generation, so a small batch size is fine.
364 max_examples=10,
365 deadline=None,
366 verbosity=Verbosity.quiet,
367 phases=(Phase.generate,),
368 suppress_health_check=list(HealthCheck),
369 )
370 def example_generating_inner_function(
371 ex: Ex, # type: ignore # mypy is overzealous in preventing covariant params
372 ) -> None:
373 self.__examples.append(ex)
374
375 example_generating_inner_function()
376 shuffle(self.__examples)
377 return self.__examples.pop()
378
379 def map(self, pack: Callable[[Ex], T]) -> "SearchStrategy[T]":
380 """Returns a new strategy which generates a value from this one, and
381 then returns ``pack(value)``. For example, ``integers().map(str)``
382 could generate ``str(5)`` == ``"5"``.
383 """
384 if is_identity_function(pack):
385 return self # type: ignore # Mypy has no way to know that `Ex == T`
386 return MappedStrategy(self, pack=pack)
387
388 def flatmap(
389 self, expand: Callable[[Ex], "SearchStrategy[T]"]
390 ) -> "SearchStrategy[T]": # FIXME
391 """Old syntax for a special case of |@composite|:
392
393 .. code-block:: python
394
395 @st.composite
396 def flatmap_like(draw, base_strategy, expand):
397 value = draw(base_strategy)
398 new_strategy = expand(value)
399 return draw(new_strategy)
400
401 We find that the greater readability of |@composite| usually outweighs
402 the verbosity, with a few exceptions for simple cases or recipes like
403 ``from_type(type).flatmap(from_type)`` ("pick a type, get a strategy for
404 any instance of that type, and then generate one of those").
405 """
406 from hypothesis.strategies._internal.flatmapped import FlatMapStrategy
407
408 return FlatMapStrategy(self, expand=expand)
409
410 # Note that we previously had condition extracted to a type alias as
411 # PredicateT. However, that was only useful when not specifying a relationship
412 # between the generic Ts and some other function param / return value.
413 # If we do want to - like here, where we want to say that the Ex arg to condition
414 # is of the same type as the strategy's Ex - then you need to write out the
415 # entire Callable[[Ex], Any] expression rather than use a type alias.
416 # TypeAlias is *not* simply a macro that inserts the text. TypeAlias will not
417 # reference the local TypeVar context.
418 @overload
419 def filter(
420 self, condition: Callable[[Ex], TypeGuard[T]]
421 ) -> "SearchStrategy[T]": ...
422 @overload
423 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]": ...
424 def filter(self, condition):
425 """Returns a new strategy that generates values from this strategy
426 which satisfy the provided condition.
427
428 Note that if the condition is too hard to satisfy this might result
429 in your tests failing with an Unsatisfiable exception.
430 A basic version of the filtering logic would look something like:
431
432 .. code-block:: python
433
434 @st.composite
435 def filter_like(draw, strategy, condition):
436 for _ in range(3):
437 value = draw(strategy)
438 if condition(value):
439 return value
440 assume(False)
441 """
442 return FilteredStrategy(self, conditions=(condition,))
443
444 @property
445 def branches(self) -> Sequence["SearchStrategy[Ex]"]:
446 return [self]
447
448 def __or__(self, other: "SearchStrategy[T]") -> "SearchStrategy[Ex | T]":
449 """Return a strategy which produces values by randomly drawing from one
450 of this strategy or the other strategy.
451
452 This method is part of the public API.
453 """
454 if not isinstance(other, SearchStrategy):
455 raise ValueError(f"Cannot | a SearchStrategy with {other!r}")
456
457 # Unwrap explicitly or'd strategies. This turns the
458 # common case of e.g. st.integers() | st.integers() | st.integers() from
459 #
460 # one_of(one_of(integers(), integers()), integers())
461 #
462 # into
463 #
464 # one_of(integers(), integers(), integers())
465 #
466 # This is purely an aesthetic unwrapping, for e.g. reprs. In practice
467 # we use .branches / .element_strategies to get the list of possible
468 # strategies, so this unwrapping is *not* necessary for correctness.
469 strategies: list[SearchStrategy] = []
470 strategies.extend(
471 self.original_strategies if isinstance(self, OneOfStrategy) else [self]
472 )
473 strategies.extend(
474 other.original_strategies if isinstance(other, OneOfStrategy) else [other]
475 )
476 return OneOfStrategy(strategies)
477
478 def __bool__(self) -> bool:
479 warnings.warn(
480 f"bool({self!r}) is always True, did you mean to draw a value?",
481 HypothesisWarning,
482 stacklevel=2,
483 )
484 return True
485
486 def validate(self) -> None:
487 """Throw an exception if the strategy is not valid.
488
489 Strategies should implement ``do_validate``, which is called by this
490 method. They should not override ``validate``.
491
492 This can happen due to invalid arguments, or lazy construction.
493 """
494 thread_id = threading.get_ident()
495 if self.validate_called.get(thread_id, False):
496 return
497 # we need to set validate_called before calling do_validate, for
498 # recursive / deferred strategies. But if a thread switches after
499 # validate_called but before do_validate, we might have a strategy
500 # which does weird things like drawing when do_validate would error but
501 # its params are technically valid (e.g. a param was passed as 1.0
502 # instead of 1) and get into weird internal states.
503 #
504 # There are two ways to fix this.
505 # (1) The first is a per-strategy lock around do_validate. Even though we
506 # expect near-zero lock contention, this still adds the lock overhead.
507 # (2) The second is allowing concurrent .validate calls. Since validation
508 # is (assumed to be) deterministic, both threads will produce the same
509 # end state, so the validation order or race conditions does not matter.
510 #
511 # In order to avoid the lock overhead of (1), we use (2) here. See also
512 # discussion in https://github.com/HypothesisWorks/hypothesis/pull/4473.
513 try:
514 self.validate_called[thread_id] = True
515 self.do_validate()
516 self.is_empty
517 self.has_reusable_values
518 except Exception:
519 self.validate_called[thread_id] = False
520 raise
521
522 @property
523 def class_label(self) -> int:
524 cls = self.__class__
525 try:
526 return cls.LABELS[cls]
527 except KeyError:
528 pass
529 result = calc_label_from_cls(cls)
530 cls.LABELS[cls] = result
531 return result
532
533 @property
534 def label(self) -> int:
535 if isinstance((label := self.__label), int):
536 # avoid locking if we've already completely computed the label.
537 return label
538
539 with label_lock:
540 if self.__label is calculating:
541 return 0
542 self.__label = calculating
543 self.__label = self.calc_label()
544 return self.__label
545
546 def calc_label(self) -> int:
547 return self.class_label
548
549 def do_validate(self) -> None:
550 pass
551
552 def do_draw(self, data: ConjectureData) -> Ex:
553 raise NotImplementedError(f"{type(self).__name__}.do_draw")
554
555
556def _is_hashable(value: object) -> tuple[bool, int | None]:
557 # hashing can be expensive; return the hash value if we compute it, so that
558 # callers don't have to recompute.
559 try:
560 return (True, hash(value))
561 except TypeError:
562 return (False, None)
563
564
565def is_hashable(value: object) -> bool:
566 return _is_hashable(value)[0]
567
568
569class SampledFromStrategy(SearchStrategy[Ex]):
570 """A strategy which samples from a set of elements. This is essentially
571 equivalent to using a OneOfStrategy over Just strategies but may be more
572 efficient and convenient.
573 """
574
575 _MAX_FILTER_CALLS: ClassVar[int] = 10_000
576
577 def __init__(
578 self,
579 elements: Sequence[Ex],
580 *,
581 force_repr: str | None = None,
582 force_repr_braces: tuple[str, str] | None = None,
583 transformations: tuple[
584 tuple[Literal["filter", "map"], Callable[[Ex], Any]],
585 ...,
586 ] = (),
587 ):
588 super().__init__()
589 self.elements = cu.check_sample(elements, "sampled_from")
590 assert self.elements
591 self.force_repr = force_repr
592 self.force_repr_braces = force_repr_braces
593 self._transformations = transformations
594
595 self._cached_repr: str | None = None
596
597 def map(self, pack: Callable[[Ex], T]) -> SearchStrategy[T]:
598 s = type(self)(
599 self.elements,
600 force_repr=self.force_repr,
601 force_repr_braces=self.force_repr_braces,
602 transformations=(*self._transformations, ("map", pack)),
603 )
604 # guaranteed by the ("map", pack) transformation
605 return cast(SearchStrategy[T], s)
606
607 @overload
608 def filter(
609 self, condition: Callable[[Ex], TypeGuard[T]]
610 ) -> "SearchStrategy[T]": ...
611 @overload
612 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]": ...
613 def filter(self, condition):
614 return type(self)(
615 self.elements,
616 force_repr=self.force_repr,
617 force_repr_braces=self.force_repr_braces,
618 transformations=(*self._transformations, ("filter", condition)),
619 )
620
621 def __repr__(self):
622 if self._cached_repr is None:
623 rep = get_pretty_function_description
624 elements_s = (
625 ", ".join(rep(v) for v in self.elements[:512]) + ", ..."
626 if len(self.elements) > 512
627 else ", ".join(rep(v) for v in self.elements)
628 )
629 braces = self.force_repr_braces or ("(", ")")
630 instance_s = (
631 self.force_repr or f"sampled_from({braces[0]}{elements_s}{braces[1]})"
632 )
633 transforms_s = "".join(
634 f".{name}({get_pretty_function_description(f)})"
635 for name, f in self._transformations
636 )
637 repr_s = instance_s + transforms_s
638 self._cached_repr = repr_s
639 return self._cached_repr
640
641 def calc_label(self) -> int:
642 # strategy.label is effectively an under-approximation of structural
643 # equality (i.e., some strategies may have the same label when they are not
644 # structurally identical). More importantly for calculating the
645 # SampledFromStrategy label, we might have hash(s1) != hash(s2) even
646 # when s1 and s2 are structurally identical. For instance:
647 #
648 # s1 = st.sampled_from([st.none()])
649 # s2 = st.sampled_from([st.none()])
650 # assert hash(s1) != hash(s2)
651 #
652 # (see also test cases in test_labels.py).
653 #
654 # We therefore use the labels of any component strategies when calculating
655 # our label, and only use the hash if it is not a strategy.
656 #
657 # That's the ideal, anyway. In reality the logic is more complicated than
658 # necessary in order to be efficient in the presence of (very) large sequences:
659 # * add an unabashed special case for range, to avoid iteration over an
660 # enormous range when we know it is entirely integers.
661 # * if there is at least one strategy in self.elements, use strategy label,
662 # and the element hash otherwise.
663 # * if there are no strategies in self.elements, take the hash of the
664 # entire sequence. This prevents worst-case performance of hashing each
665 # element when a hash of the entire sequence would have sufficed.
666 #
667 # The worst case performance of this scheme is
668 # itertools.chain(range(2**100), [st.none()]), where it degrades to
669 # hashing every int in the range.
670 elements_is_hashable, hash_value = _is_hashable(self.elements)
671 if isinstance(self.elements, range) or (
672 elements_is_hashable
673 and not any(isinstance(e, SearchStrategy) for e in self.elements)
674 ):
675 return combine_labels(
676 self.class_label, calc_label_from_name(str(hash_value))
677 )
678
679 labels = [self.class_label]
680 for element in self.elements:
681 if not is_hashable(element):
682 continue
683
684 labels.append(
685 element.label
686 if isinstance(element, SearchStrategy)
687 else calc_label_from_hash(element)
688 )
689
690 return combine_labels(*labels)
691
692 def calc_has_reusable_values(self, recur: RecurT) -> bool:
693 # Because our custom .map/.filter implementations skip the normal
694 # wrapper strategies (which would automatically return False for us),
695 # we need to manually return False here if any transformations have
696 # been applied.
697 return not self._transformations
698
699 def calc_is_cacheable(self, recur: RecurT) -> bool:
700 return is_hashable(self.elements)
701
702 def _transform(
703 self,
704 # https://github.com/python/mypy/issues/7049, we're not writing `element`
705 # anywhere in the class so this is still type-safe. mypy is being more
706 # conservative than necessary
707 element: Ex, # type: ignore
708 ) -> Ex | UniqueIdentifier:
709 # Used in UniqueSampledListStrategy
710 for name, f in self._transformations:
711 if name == "map":
712 result = f(element)
713 if build_context := _current_build_context.value:
714 build_context.record_call(result, f, args=[element], kwargs={})
715 element = result
716 else:
717 assert name == "filter"
718 if not f(element):
719 return filter_not_satisfied
720 return element
721
722 def do_draw(self, data: ConjectureData) -> Ex:
723 result = self.do_filtered_draw(data)
724 if isinstance(result, SearchStrategy) and all(
725 isinstance(x, SearchStrategy) for x in self.elements
726 ):
727 data._sampled_from_all_strategies_elements_message = (
728 "sampled_from was given a collection of strategies: "
729 "{!r}. Was one_of intended?",
730 self.elements,
731 )
732 if result is filter_not_satisfied:
733 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}")
734 assert not isinstance(result, UniqueIdentifier)
735 return result
736
737 def get_element(self, i: int) -> Ex | UniqueIdentifier:
738 return self._transform(self.elements[i])
739
740 def do_filtered_draw(self, data: ConjectureData) -> Ex | UniqueIdentifier:
741 # Set of indices that have been tried so far, so that we never test
742 # the same element twice during a draw.
743 known_bad_indices: set[int] = set()
744
745 # Start with ordinary rejection sampling. It's fast if it works, and
746 # if it doesn't work then it was only a small amount of overhead.
747 for _ in range(3):
748 i = data.draw_integer(0, len(self.elements) - 1)
749 if i not in known_bad_indices:
750 element = self.get_element(i)
751 if element is not filter_not_satisfied:
752 return element
753 if not known_bad_indices:
754 data.events[f"Retried draw from {self!r} to satisfy filter"] = ""
755 known_bad_indices.add(i)
756
757 # If we've tried all the possible elements, give up now.
758 max_good_indices = len(self.elements) - len(known_bad_indices)
759 if not max_good_indices:
760 return filter_not_satisfied
761
762 # Impose an arbitrary cutoff to prevent us from wasting too much time
763 # on very large element lists.
764 max_good_indices = min(max_good_indices, self._MAX_FILTER_CALLS - 3)
765
766 # Before building the list of allowed indices, speculatively choose
767 # one of them. We don't yet know how many allowed indices there will be,
768 # so this choice might be out-of-bounds, but that's OK.
769 speculative_index = data.draw_integer(0, max_good_indices - 1)
770
771 # Calculate the indices of allowed values, so that we can choose one
772 # of them at random. But if we encounter the speculatively-chosen one,
773 # just use that and return immediately. Note that we also track the
774 # allowed elements, in case of .map(some_stateful_function)
775 allowed: list[tuple[int, Ex]] = []
776 for i in range(min(len(self.elements), self._MAX_FILTER_CALLS - 3)):
777 if i not in known_bad_indices:
778 element = self.get_element(i)
779 if element is not filter_not_satisfied:
780 assert not isinstance(element, UniqueIdentifier)
781 allowed.append((i, element))
782 if len(allowed) > speculative_index:
783 # Early-exit case: We reached the speculative index, so
784 # we just return the corresponding element.
785 data.draw_integer(0, len(self.elements) - 1, forced=i)
786 return element
787
788 # The speculative index didn't work out, but at this point we've built
789 # and can choose from the complete list of allowed indices and elements.
790 if allowed:
791 i, element = data.choice(allowed)
792 data.draw_integer(0, len(self.elements) - 1, forced=i)
793 return element
794 # If there are no allowed indices, the filter couldn't be satisfied.
795 return filter_not_satisfied
796
797
798class OneOfStrategy(SearchStrategy[Ex]):
799 """Implements a union of strategies. Given a number of strategies this
800 generates values which could have come from any of them.
801
802 The conditional distribution draws uniformly at random from some
803 non-empty subset of these strategies and then draws from the
804 conditional distribution of that strategy.
805 """
806
807 def __init__(self, strategies: Sequence[SearchStrategy[Ex]]):
808 super().__init__()
809 self.original_strategies = tuple(strategies)
810 self.__element_strategies: Sequence[SearchStrategy[Ex]] | None = None
811 self.__in_branches = False
812 self._branches_lock = RLock()
813
814 def calc_is_empty(self, recur: RecurT) -> bool:
815 return all(recur(e) for e in self.original_strategies)
816
817 def calc_has_reusable_values(self, recur: RecurT) -> bool:
818 return all(recur(e) for e in self.original_strategies)
819
820 def calc_is_cacheable(self, recur: RecurT) -> bool:
821 return all(recur(e) for e in self.original_strategies)
822
823 @property
824 def element_strategies(self) -> Sequence[SearchStrategy[Ex]]:
825 if self.__element_strategies is None:
826 # While strategies are hashable, they use object.__hash__ and are
827 # therefore distinguished only by identity.
828 #
829 # In principle we could "just" define a __hash__ method
830 # (and __eq__, but that's easy in terms of type() and hash())
831 # to make this more powerful, but this is harder than it sounds:
832 #
833 # 1. Strategies are often distinguished by non-hashable attributes,
834 # or by attributes that have the same hash value ("^.+" / b"^.+").
835 # 2. LazyStrategy: can't reify the wrapped strategy without breaking
836 # laziness, so there's a hash each for the lazy and the nonlazy.
837 #
838 # Having made several attempts, the minor benefits of making strategies
839 # hashable are simply not worth the engineering effort it would take.
840 # See also issues #2291 and #2327.
841 seen: set[SearchStrategy] = {self}
842 strategies: list[SearchStrategy] = []
843 for arg in self.original_strategies:
844 check_strategy(arg)
845 if not arg.is_empty:
846 for s in arg.branches:
847 if s not in seen and not s.is_empty:
848 seen.add(s)
849 strategies.append(s)
850 self.__element_strategies = strategies
851 return self.__element_strategies
852
853 def calc_label(self) -> int:
854 return combine_labels(
855 self.class_label, *(p.label for p in self.original_strategies)
856 )
857
858 def do_draw(self, data: ConjectureData) -> Ex:
859 strategy = data.draw(
860 SampledFromStrategy(self.element_strategies).filter(
861 lambda s: not s.is_currently_empty(data)
862 )
863 )
864 return data.draw(strategy)
865
866 def __repr__(self) -> str:
867 return "one_of({})".format(", ".join(map(repr, self.original_strategies)))
868
869 def do_validate(self) -> None:
870 for e in self.element_strategies:
871 e.validate()
872
873 @property
874 def branches(self) -> Sequence[SearchStrategy[Ex]]:
875 if self.__element_strategies is not None:
876 # common fast path which avoids the lock
877 return self.element_strategies
878
879 with self._branches_lock:
880 if not self.__in_branches:
881 try:
882 self.__in_branches = True
883 return self.element_strategies
884 finally:
885 self.__in_branches = False
886 else:
887 return [self]
888
889 @overload
890 def filter(
891 self, condition: Callable[[Ex], TypeGuard[T]]
892 ) -> "SearchStrategy[T]": ...
893 @overload
894 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]": ...
895 def filter(self, condition):
896 return FilteredStrategy(
897 OneOfStrategy([s.filter(condition) for s in self.original_strategies]),
898 conditions=(),
899 )
900
901
902@overload
903def one_of(
904 __args: Sequence[SearchStrategy[Ex]],
905) -> SearchStrategy[Ex]: # pragma: no cover
906 ...
907
908
909@overload
910def one_of(__a1: SearchStrategy[Ex]) -> SearchStrategy[Ex]: # pragma: no cover
911 ...
912
913
914@overload
915def one_of(
916 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T]
917) -> SearchStrategy[Ex | T]: # pragma: no cover
918 ...
919
920
921@overload
922def one_of(
923 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T], __a3: SearchStrategy[T3]
924) -> SearchStrategy[Ex | T | T3]: # pragma: no cover
925 ...
926
927
928@overload
929def one_of(
930 __a1: SearchStrategy[Ex],
931 __a2: SearchStrategy[T],
932 __a3: SearchStrategy[T3],
933 __a4: SearchStrategy[T4],
934) -> SearchStrategy[Ex | T | T3 | T4]: # pragma: no cover
935 ...
936
937
938@overload
939def one_of(
940 __a1: SearchStrategy[Ex],
941 __a2: SearchStrategy[T],
942 __a3: SearchStrategy[T3],
943 __a4: SearchStrategy[T4],
944 __a5: SearchStrategy[T5],
945) -> SearchStrategy[Ex | T | T3 | T4 | T5]: # pragma: no cover
946 ...
947
948
949@overload
950def one_of(*args: SearchStrategy[Any]) -> SearchStrategy[Any]: # pragma: no cover
951 ...
952
953
954@defines_strategy(eager=True)
955def one_of(
956 *args: Sequence[SearchStrategy[Any]] | SearchStrategy[Any],
957) -> SearchStrategy[Any]:
958 # Mypy workaround alert: Any is too loose above; the return parameter
959 # should be the union of the input parameters. Unfortunately, Mypy <=0.600
960 # raises errors due to incompatible inputs instead. See #1270 for links.
961 # v0.610 doesn't error; it gets inference wrong for 2+ arguments instead.
962 """Return a strategy which generates values from any of the argument
963 strategies.
964
965 This may be called with one iterable argument instead of multiple
966 strategy arguments, in which case ``one_of(x)`` and ``one_of(*x)`` are
967 equivalent.
968
969 Examples from this strategy will generally shrink to ones that come from
970 strategies earlier in the list, then shrink according to behaviour of the
971 strategy that produced them. In order to get good shrinking behaviour,
972 try to put simpler strategies first. e.g. ``one_of(none(), text())`` is
973 better than ``one_of(text(), none())``.
974
975 This is especially important when using recursive strategies. e.g.
976 ``x = st.deferred(lambda: st.none() | st.tuples(x, x))`` will shrink well,
977 but ``x = st.deferred(lambda: st.tuples(x, x) | st.none())`` will shrink
978 very badly indeed.
979 """
980 if len(args) == 1 and not isinstance(args[0], SearchStrategy):
981 try:
982 args = tuple(args[0])
983 except TypeError:
984 pass
985 if len(args) == 1 and isinstance(args[0], SearchStrategy):
986 # This special-case means that we can one_of over lists of any size
987 # without incurring any performance overhead when there is only one
988 # strategy, and keeps our reprs simple.
989 return args[0]
990 if args and not any(isinstance(a, SearchStrategy) for a in args):
991 # And this special case is to give a more-specific error message if it
992 # seems that the user has confused `one_of()` for `sampled_from()`;
993 # the remaining validation is left to OneOfStrategy. See PR #2627.
994 raise InvalidArgument(
995 f"Did you mean st.sampled_from({list(args)!r})? st.one_of() is used "
996 "to combine strategies, but all of the arguments were of other types."
997 )
998 # we've handled the case where args is a one-element sequence [(s1, s2, ...)]
999 # above, so we can assume it's an actual sequence of strategies.
1000 args = cast(Sequence[SearchStrategy], args)
1001 return OneOfStrategy(args)
1002
1003
1004class MappedStrategy(SearchStrategy[MappedTo], Generic[MappedFrom, MappedTo]):
1005 """A strategy which is defined purely by conversion to and from another
1006 strategy.
1007
1008 Its parameter and distribution come from that other strategy.
1009 """
1010
1011 def __init__(
1012 self,
1013 strategy: SearchStrategy[MappedFrom],
1014 pack: Callable[[MappedFrom], MappedTo],
1015 ) -> None:
1016 super().__init__()
1017 self.mapped_strategy = strategy
1018 self.pack = pack
1019
1020 def calc_is_empty(self, recur: RecurT) -> bool:
1021 return recur(self.mapped_strategy)
1022
1023 def calc_is_cacheable(self, recur: RecurT) -> bool:
1024 return recur(self.mapped_strategy)
1025
1026 def __repr__(self) -> str:
1027 if not hasattr(self, "_cached_repr"):
1028 self._cached_repr = f"{self.mapped_strategy!r}.map({get_pretty_function_description(self.pack)})"
1029 return self._cached_repr
1030
1031 def do_validate(self) -> None:
1032 self.mapped_strategy.validate()
1033
1034 def do_draw(self, data: ConjectureData) -> MappedTo:
1035 with warnings.catch_warnings():
1036 if isinstance(self.pack, type) and issubclass(
1037 self.pack, (abc.Mapping, abc.Set)
1038 ):
1039 warnings.simplefilter("ignore", BytesWarning)
1040 for _ in range(3):
1041 try:
1042 data.start_span(MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL)
1043 x = data.draw(self.mapped_strategy)
1044 result = self.pack(x)
1045 data.stop_span()
1046 current_build_context().record_call(
1047 result, self.pack, args=[x], kwargs={}
1048 )
1049 return result
1050 except UnsatisfiedAssumption:
1051 data.stop_span(discard=True)
1052 raise UnsatisfiedAssumption
1053
1054 @property
1055 def branches(self) -> Sequence[SearchStrategy[MappedTo]]:
1056 return [
1057 MappedStrategy(strategy, pack=self.pack)
1058 for strategy in self.mapped_strategy.branches
1059 ]
1060
1061 @overload
1062 def filter(
1063 self, condition: Callable[[MappedTo], TypeGuard[T]]
1064 ) -> "SearchStrategy[T]": ...
1065 @overload
1066 def filter(
1067 self, condition: Callable[[MappedTo], Any]
1068 ) -> "SearchStrategy[MappedTo]": ...
1069 def filter(self, condition):
1070 # Includes a special case so that we can rewrite filters on collection
1071 # lengths, when most collections are `st.lists(...).map(the_type)`.
1072 ListStrategy = _list_strategy_type()
1073 if not isinstance(self.mapped_strategy, ListStrategy) or not (
1074 (isinstance(self.pack, type) and issubclass(self.pack, abc.Collection))
1075 or self.pack in _collection_ish_functions()
1076 ):
1077 return super().filter(condition)
1078
1079 # Check whether our inner list strategy can rewrite this filter condition.
1080 # If not, discard the result and _only_ apply a new outer filter.
1081 new = ListStrategy.filter(self.mapped_strategy, condition)
1082 if getattr(new, "filtered_strategy", None) is self.mapped_strategy:
1083 return super().filter(condition) # didn't rewrite
1084
1085 # Apply a new outer filter even though we rewrote the inner strategy,
1086 # because some collections can change the list length (dict, set, etc).
1087 return FilteredStrategy(type(self)(new, self.pack), conditions=(condition,))
1088
1089
1090@lru_cache
1091def _list_strategy_type() -> Any:
1092 from hypothesis.strategies._internal.collections import ListStrategy
1093
1094 return ListStrategy
1095
1096
1097def _collection_ish_functions() -> Sequence[Any]:
1098 funcs = [sorted]
1099 if np := sys.modules.get("numpy"):
1100 # c.f. https://numpy.org/doc/stable/reference/routines.array-creation.html
1101 # Probably only `np.array` and `np.asarray` will be used in practice,
1102 # but why should that stop us when we've already gone this far?
1103 funcs += [
1104 np.empty_like,
1105 np.eye,
1106 np.identity,
1107 np.ones_like,
1108 np.zeros_like,
1109 np.array,
1110 np.asarray,
1111 np.asanyarray,
1112 np.ascontiguousarray,
1113 np.asmatrix,
1114 np.copy,
1115 np.rec.array,
1116 np.rec.fromarrays,
1117 np.rec.fromrecords,
1118 np.diag,
1119 # bonus undocumented functions from tab-completion:
1120 np.asarray_chkfinite,
1121 np.asfortranarray,
1122 ]
1123
1124 return funcs
1125
1126
1127filter_not_satisfied = UniqueIdentifier("filter not satisfied")
1128
1129
1130class FilteredStrategy(SearchStrategy[Ex]):
1131 def __init__(
1132 self, strategy: SearchStrategy[Ex], conditions: tuple[Callable[[Ex], Any], ...]
1133 ):
1134 super().__init__()
1135 if isinstance(strategy, FilteredStrategy):
1136 # Flatten chained filters into a single filter with multiple conditions.
1137 self.flat_conditions: tuple[Callable[[Ex], Any], ...] = (
1138 strategy.flat_conditions + conditions
1139 )
1140 self.filtered_strategy: SearchStrategy[Ex] = strategy.filtered_strategy
1141 else:
1142 self.flat_conditions = conditions
1143 self.filtered_strategy = strategy
1144
1145 assert isinstance(self.flat_conditions, tuple)
1146 assert not isinstance(self.filtered_strategy, FilteredStrategy)
1147
1148 self.__condition: Callable[[Ex], Any] | None = None
1149
1150 def calc_is_empty(self, recur: RecurT) -> bool:
1151 return recur(self.filtered_strategy)
1152
1153 def calc_is_cacheable(self, recur: RecurT) -> bool:
1154 return recur(self.filtered_strategy)
1155
1156 def __repr__(self) -> str:
1157 if not hasattr(self, "_cached_repr"):
1158 self._cached_repr = "{!r}{}".format(
1159 self.filtered_strategy,
1160 "".join(
1161 f".filter({get_pretty_function_description(cond)})"
1162 for cond in self.flat_conditions
1163 ),
1164 )
1165 return self._cached_repr
1166
1167 def do_validate(self) -> None:
1168 # Start by validating our inner filtered_strategy. If this was a LazyStrategy,
1169 # validation also reifies it so that subsequent calls to e.g. `.filter()` will
1170 # be passed through.
1171 self.filtered_strategy.validate()
1172 # So now we have a reified inner strategy, we'll replay all our saved
1173 # predicates in case some or all of them can be rewritten. Note that this
1174 # replaces the `fresh` strategy too!
1175 fresh = self.filtered_strategy
1176 for cond in self.flat_conditions:
1177 fresh = fresh.filter(cond)
1178 if isinstance(fresh, FilteredStrategy):
1179 # In this case we have at least some non-rewritten filter predicates,
1180 # so we just re-initialize the strategy.
1181 FilteredStrategy.__init__(
1182 self, fresh.filtered_strategy, fresh.flat_conditions
1183 )
1184 else:
1185 # But if *all* the predicates were rewritten... well, do_validate() is
1186 # an in-place method so we still just re-initialize the strategy!
1187 FilteredStrategy.__init__(self, fresh, ())
1188
1189 @overload
1190 def filter(
1191 self, condition: Callable[[Ex], TypeGuard[T]]
1192 ) -> "FilteredStrategy[T]": ...
1193 @overload
1194 def filter(self, condition: Callable[[Ex], Any]) -> "FilteredStrategy[Ex]": ...
1195 def filter(self, condition):
1196 # If we can, it's more efficient to rewrite our strategy to satisfy the
1197 # condition. We therefore exploit the fact that the order of predicates
1198 # doesn't matter (`f(x) and g(x) == g(x) and f(x)`) by attempting to apply
1199 # condition directly to our filtered strategy as the inner-most filter.
1200 out = self.filtered_strategy.filter(condition)
1201 # If it couldn't be rewritten, we'll get a new FilteredStrategy - and then
1202 # combine the conditions of each in our expected newest=last order.
1203 if isinstance(out, FilteredStrategy):
1204 return FilteredStrategy(
1205 out.filtered_strategy, self.flat_conditions + out.flat_conditions
1206 )
1207 # But if it *could* be rewritten, we can return the more efficient form!
1208 return FilteredStrategy(out, self.flat_conditions)
1209
1210 @property
1211 def condition(self) -> Callable[[Ex], Any]:
1212 # We write this defensively to avoid any threading race conditions
1213 # with our manual FilteredStrategy.__init__ for filter-rewriting.
1214 # See https://github.com/HypothesisWorks/hypothesis/pull/4522.
1215 if (condition := self.__condition) is not None:
1216 return condition
1217
1218 if len(self.flat_conditions) == 1:
1219 # Avoid an extra indirection in the common case of only one condition.
1220 condition = self.flat_conditions[0]
1221 elif len(self.flat_conditions) == 0:
1222 # Possible, if unlikely, due to filter predicate rewriting
1223 condition = lambda _: True
1224 else:
1225 condition = lambda x: all(cond(x) for cond in self.flat_conditions)
1226 self.__condition = condition
1227 return condition
1228
1229 def do_draw(self, data: ConjectureData) -> Ex:
1230 result = self.do_filtered_draw(data)
1231 if result is not filter_not_satisfied:
1232 return cast(Ex, result)
1233
1234 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}")
1235
1236 def do_filtered_draw(self, data: ConjectureData) -> Ex | UniqueIdentifier:
1237 for i in range(3):
1238 data.start_span(FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL)
1239 value = data.draw(self.filtered_strategy)
1240 if self.condition(value):
1241 data.stop_span()
1242 return value
1243 else:
1244 data.stop_span(discard=True)
1245 if i == 0:
1246 data.events[f"Retried draw from {self!r} to satisfy filter"] = ""
1247
1248 return filter_not_satisfied
1249
1250 @property
1251 def branches(self) -> Sequence[SearchStrategy[Ex]]:
1252 return [
1253 FilteredStrategy(strategy=strategy, conditions=self.flat_conditions)
1254 for strategy in self.filtered_strategy.branches
1255 ]
1256
1257
1258@check_function
1259def check_strategy(arg: object, name: str = "") -> None:
1260 assert isinstance(name, str)
1261 if not isinstance(arg, SearchStrategy):
1262 hint = ""
1263 if isinstance(arg, (list, tuple)):
1264 hint = ", such as st.sampled_from({}),".format(name or "...")
1265 if name:
1266 name += "="
1267 raise InvalidArgument(
1268 f"Expected a SearchStrategy{hint} but got {name}{arg!r} "
1269 f"(type={type(arg).__name__})"
1270 )