1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11from collections.abc import Sequence
12from inspect import signature
13from typing import Any, Callable, Optional
14from weakref import WeakKeyDictionary
15
16from hypothesis.configuration import check_sideeffect_during_initialization
17from hypothesis.internal.conjecture.data import ConjectureData
18from hypothesis.internal.reflection import (
19 convert_keyword_arguments,
20 convert_positional_arguments,
21 get_pretty_function_description,
22 repr_call,
23)
24from hypothesis.strategies._internal.deferred import DeferredStrategy
25from hypothesis.strategies._internal.strategies import Ex, RecurT, SearchStrategy
26from hypothesis.utils.threading import ThreadLocal
27
28threadlocal = ThreadLocal(unwrap_depth=int, unwrap_cache=WeakKeyDictionary)
29
30
31def unwrap_strategies(s):
32 # optimization
33 if not isinstance(s, (LazyStrategy, DeferredStrategy)):
34 return s
35
36 try:
37 return threadlocal.unwrap_cache[s]
38 except KeyError:
39 pass
40
41 threadlocal.unwrap_cache[s] = s
42 threadlocal.unwrap_depth += 1
43
44 try:
45 result = unwrap_strategies(s.wrapped_strategy)
46 threadlocal.unwrap_cache[s] = result
47
48 try:
49 assert result.force_has_reusable_values == s.force_has_reusable_values
50 except AttributeError:
51 pass
52
53 try:
54 result.force_has_reusable_values = s.force_has_reusable_values
55 except AttributeError:
56 pass
57
58 return result
59 finally:
60 threadlocal.unwrap_depth -= 1
61 if threadlocal.unwrap_depth <= 0:
62 threadlocal.unwrap_cache.clear()
63 assert threadlocal.unwrap_depth >= 0
64
65
66class LazyStrategy(SearchStrategy[Ex]):
67 """A strategy which is defined purely by conversion to and from another
68 strategy.
69
70 Its parameter and distribution come from that other strategy.
71 """
72
73 def __init__(
74 self,
75 function: Callable[..., SearchStrategy[Ex]],
76 args: Sequence[object],
77 kwargs: dict[str, object],
78 *,
79 transforms: tuple[tuple[str, Callable[..., Any]], ...] = (),
80 force_repr: Optional[str] = None,
81 ):
82 super().__init__()
83 self.__wrapped_strategy: Optional[SearchStrategy[Ex]] = None
84 self.__representation: Optional[str] = force_repr
85 self.function = function
86 self.__args = args
87 self.__kwargs = kwargs
88 self._transformations = transforms
89
90 def calc_is_empty(self, recur: RecurT) -> bool:
91 return recur(self.wrapped_strategy)
92
93 def calc_has_reusable_values(self, recur: RecurT) -> bool:
94 return recur(self.wrapped_strategy)
95
96 def calc_is_cacheable(self, recur: RecurT) -> bool:
97 for source in (self.__args, self.__kwargs.values()):
98 for v in source:
99 if isinstance(v, SearchStrategy) and not v.is_cacheable:
100 return False
101 return True
102
103 def calc_label(self) -> int:
104 return self.wrapped_strategy.label
105
106 @property
107 def wrapped_strategy(self) -> SearchStrategy[Ex]:
108 if self.__wrapped_strategy is None:
109 check_sideeffect_during_initialization("lazy evaluation of {!r}", self)
110
111 unwrapped_args = tuple(unwrap_strategies(s) for s in self.__args)
112 unwrapped_kwargs = {
113 k: unwrap_strategies(v) for k, v in self.__kwargs.items()
114 }
115
116 base = self.function(*self.__args, **self.__kwargs)
117 if unwrapped_args == self.__args and unwrapped_kwargs == self.__kwargs:
118 _wrapped_strategy = base
119 else:
120 _wrapped_strategy = self.function(*unwrapped_args, **unwrapped_kwargs)
121 for method, fn in self._transformations:
122 _wrapped_strategy = getattr(_wrapped_strategy, method)(fn)
123 self.__wrapped_strategy = _wrapped_strategy
124 assert self.__wrapped_strategy is not None
125 return self.__wrapped_strategy
126
127 def __with_transform(self, method, fn):
128 repr_ = self.__representation
129 if repr_:
130 repr_ = f"{repr_}.{method}({get_pretty_function_description(fn)})"
131 return LazyStrategy(
132 self.function,
133 self.__args,
134 self.__kwargs,
135 transforms=(*self._transformations, (method, fn)),
136 force_repr=repr_,
137 )
138
139 def map(self, pack):
140 return self.__with_transform("map", pack)
141
142 def filter(self, condition):
143 return self.__with_transform("filter", condition)
144
145 def do_validate(self) -> None:
146 w = self.wrapped_strategy
147 assert isinstance(w, SearchStrategy), f"{self!r} returned non-strategy {w!r}"
148 w.validate()
149
150 def __repr__(self) -> str:
151 if self.__representation is None:
152 sig = signature(self.function)
153 pos = [p for p in sig.parameters.values() if "POSITIONAL" in p.kind.name]
154 if len(pos) > 1 or any(p.default is not sig.empty for p in pos):
155 _args, _kwargs = convert_positional_arguments(
156 self.function, self.__args, self.__kwargs
157 )
158 else:
159 _args, _kwargs = convert_keyword_arguments(
160 self.function, self.__args, self.__kwargs
161 )
162 kwargs_for_repr = {
163 k: v
164 for k, v in _kwargs.items()
165 if k not in sig.parameters or v is not sig.parameters[k].default
166 }
167 self.__representation = repr_call(
168 self.function, _args, kwargs_for_repr, reorder=False
169 ) + "".join(
170 f".{method}({get_pretty_function_description(fn)})"
171 for method, fn in self._transformations
172 )
173 return self.__representation
174
175 def do_draw(self, data: ConjectureData) -> Ex:
176 return data.draw(self.wrapped_strategy)