1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11from collections.abc import Sequence
12from inspect import signature
13from typing import Any, Callable, Optional
14from weakref import WeakKeyDictionary
15
16from hypothesis.configuration import check_sideeffect_during_initialization
17from hypothesis.internal.conjecture.data import ConjectureData
18from hypothesis.internal.reflection import (
19 convert_keyword_arguments,
20 convert_positional_arguments,
21 get_pretty_function_description,
22 repr_call,
23)
24from hypothesis.strategies._internal.deferred import DeferredStrategy
25from hypothesis.strategies._internal.strategies import Ex, RecurT, SearchStrategy
26from hypothesis.utils.threading import ThreadLocal
27
28threadlocal = ThreadLocal(unwrap_depth=int, unwrap_cache=WeakKeyDictionary)
29
30
31def unwrap_strategies(s):
32 # optimization
33 if not isinstance(s, (LazyStrategy, DeferredStrategy)):
34 return s
35
36 try:
37 return threadlocal.unwrap_cache[s]
38 except KeyError:
39 pass
40
41 threadlocal.unwrap_cache[s] = s
42 threadlocal.unwrap_depth += 1
43
44 try:
45 result = unwrap_strategies(s.wrapped_strategy)
46 threadlocal.unwrap_cache[s] = result
47
48 try:
49 assert result.force_has_reusable_values == s.force_has_reusable_values
50 except AttributeError:
51 pass
52
53 try:
54 result.force_has_reusable_values = s.force_has_reusable_values
55 except AttributeError:
56 pass
57
58 return result
59 finally:
60 threadlocal.unwrap_depth -= 1
61 if threadlocal.unwrap_depth <= 0:
62 threadlocal.unwrap_cache.clear()
63 assert threadlocal.unwrap_depth >= 0
64
65
66class LazyStrategy(SearchStrategy[Ex]):
67 """A strategy which is defined purely by conversion to and from another
68 strategy.
69
70 Its parameter and distribution come from that other strategy.
71 """
72
73 def __init__(
74 self,
75 function: Callable[..., SearchStrategy[Ex]],
76 args: Sequence[object],
77 kwargs: dict[str, object],
78 *,
79 transforms: tuple[tuple[str, Callable[..., Any]], ...] = (),
80 force_repr: Optional[str] = None,
81 ):
82 super().__init__()
83 self.__wrapped_strategy: Optional[SearchStrategy[Ex]] = None
84 self.__representation: Optional[str] = force_repr
85 self.function = function
86 self.__args = args
87 self.__kwargs = kwargs
88 self._transformations = transforms
89
90 @property
91 def supports_find(self) -> bool:
92 return self.wrapped_strategy.supports_find
93
94 def calc_is_empty(self, recur: RecurT) -> bool:
95 return recur(self.wrapped_strategy)
96
97 def calc_has_reusable_values(self, recur: RecurT) -> bool:
98 return recur(self.wrapped_strategy)
99
100 def calc_is_cacheable(self, recur: RecurT) -> bool:
101 for source in (self.__args, self.__kwargs.values()):
102 for v in source:
103 if isinstance(v, SearchStrategy) and not v.is_cacheable:
104 return False
105 return True
106
107 def calc_label(self) -> int:
108 return self.wrapped_strategy.label
109
110 @property
111 def wrapped_strategy(self) -> SearchStrategy[Ex]:
112 if self.__wrapped_strategy is None:
113 check_sideeffect_during_initialization("lazy evaluation of {!r}", self)
114
115 unwrapped_args = tuple(unwrap_strategies(s) for s in self.__args)
116 unwrapped_kwargs = {
117 k: unwrap_strategies(v) for k, v in self.__kwargs.items()
118 }
119
120 base = self.function(*self.__args, **self.__kwargs)
121 if unwrapped_args == self.__args and unwrapped_kwargs == self.__kwargs:
122 _wrapped_strategy = base
123 else:
124 _wrapped_strategy = self.function(*unwrapped_args, **unwrapped_kwargs)
125 for method, fn in self._transformations:
126 _wrapped_strategy = getattr(_wrapped_strategy, method)(fn)
127 self.__wrapped_strategy = _wrapped_strategy
128 assert self.__wrapped_strategy is not None
129 return self.__wrapped_strategy
130
131 def __with_transform(self, method, fn):
132 repr_ = self.__representation
133 if repr_:
134 repr_ = f"{repr_}.{method}({get_pretty_function_description(fn)})"
135 return LazyStrategy(
136 self.function,
137 self.__args,
138 self.__kwargs,
139 transforms=(*self._transformations, (method, fn)),
140 force_repr=repr_,
141 )
142
143 def map(self, pack):
144 return self.__with_transform("map", pack)
145
146 def filter(self, condition):
147 return self.__with_transform("filter", condition)
148
149 def do_validate(self) -> None:
150 w = self.wrapped_strategy
151 assert isinstance(w, SearchStrategy), f"{self!r} returned non-strategy {w!r}"
152 w.validate()
153
154 def __repr__(self) -> str:
155 if self.__representation is None:
156 sig = signature(self.function)
157 pos = [p for p in sig.parameters.values() if "POSITIONAL" in p.kind.name]
158 if len(pos) > 1 or any(p.default is not sig.empty for p in pos):
159 _args, _kwargs = convert_positional_arguments(
160 self.function, self.__args, self.__kwargs
161 )
162 else:
163 _args, _kwargs = convert_keyword_arguments(
164 self.function, self.__args, self.__kwargs
165 )
166 kwargs_for_repr = {
167 k: v
168 for k, v in _kwargs.items()
169 if k not in sig.parameters or v is not sig.parameters[k].default
170 }
171 self.__representation = repr_call(
172 self.function, _args, kwargs_for_repr, reorder=False
173 ) + "".join(
174 f".{method}({get_pretty_function_description(fn)})"
175 for method, fn in self._transformations
176 )
177 return self.__representation
178
179 def do_draw(self, data: ConjectureData) -> Ex:
180 return data.draw(self.wrapped_strategy)