1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import sys
12import threading
13from functools import partial
14from inspect import signature
15from typing import TYPE_CHECKING, Callable
16
17import attr
18
19from hypothesis.internal.cache import LRUReusedCache
20from hypothesis.internal.compat import dataclass_asdict
21from hypothesis.internal.floats import clamp, float_to_int
22from hypothesis.internal.reflection import proxies
23from hypothesis.vendor.pretty import pretty
24
25if TYPE_CHECKING:
26 from hypothesis.strategies._internal.strategies import SearchStrategy, T
27
28_strategies: dict[str, Callable[..., "SearchStrategy"]] = {}
29
30
31class FloatKey:
32 def __init__(self, f):
33 self.value = float_to_int(f)
34
35 def __eq__(self, other):
36 return isinstance(other, FloatKey) and (other.value == self.value)
37
38 def __ne__(self, other):
39 return not self.__eq__(other)
40
41 def __hash__(self):
42 return hash(self.value)
43
44
45def convert_value(v):
46 if isinstance(v, float):
47 return FloatKey(v)
48 return (type(v), v)
49
50
51_CACHE = threading.local()
52
53
54def get_cache() -> LRUReusedCache:
55 try:
56 return _CACHE.STRATEGY_CACHE
57 except AttributeError:
58 _CACHE.STRATEGY_CACHE = LRUReusedCache(1024)
59 return _CACHE.STRATEGY_CACHE
60
61
62def clear_cache() -> None:
63 cache = get_cache()
64 cache.clear()
65
66
67def cacheable(fn: "T") -> "T":
68 from hypothesis.control import _current_build_context
69 from hypothesis.strategies._internal.strategies import SearchStrategy
70
71 @proxies(fn)
72 def cached_strategy(*args, **kwargs):
73 context = _current_build_context.value
74 if context is not None and context.data.provider.avoid_realization:
75 return fn(*args, **kwargs)
76
77 try:
78 kwargs_cache_key = {(k, convert_value(v)) for k, v in kwargs.items()}
79 except TypeError:
80 return fn(*args, **kwargs)
81 cache_key = (fn, tuple(map(convert_value, args)), frozenset(kwargs_cache_key))
82 cache = get_cache()
83 try:
84 if cache_key in cache:
85 return cache[cache_key]
86 except TypeError:
87 return fn(*args, **kwargs)
88 else:
89 result = fn(*args, **kwargs)
90 if not isinstance(result, SearchStrategy) or result.is_cacheable:
91 cache[cache_key] = result
92 return result
93
94 cached_strategy.__clear_cache = clear_cache # type: ignore
95 return cached_strategy
96
97
98def defines_strategy(
99 *,
100 force_reusable_values: bool = False,
101 try_non_lazy: bool = False,
102 never_lazy: bool = False,
103) -> Callable[["T"], "T"]:
104 """Returns a decorator for strategy functions.
105
106 If ``force_reusable_values`` is True, the returned strategy will be marked
107 with ``.has_reusable_values == True`` even if it uses maps/filters or
108 non-reusable strategies internally. This tells our numpy/pandas strategies
109 that they can implicitly use such strategies as background values.
110
111 If ``try_non_lazy`` is True, attempt to execute the strategy definition
112 function immediately, so that a LazyStrategy is only returned if this
113 raises an exception.
114
115 If ``never_lazy`` is True, the decorator performs no lazy-wrapping at all,
116 and instead returns the original function.
117 """
118
119 def decorator(strategy_definition):
120 """A decorator that registers the function as a strategy and makes it
121 lazily evaluated."""
122 _strategies[strategy_definition.__name__] = signature(strategy_definition)
123
124 if never_lazy:
125 assert not try_non_lazy
126 # We could potentially support never_lazy + force_reusable_values
127 # with a suitable wrapper, but currently there are no callers that
128 # request this combination.
129 assert not force_reusable_values
130 return strategy_definition
131
132 from hypothesis.strategies._internal.lazy import LazyStrategy
133
134 @proxies(strategy_definition)
135 def accept(*args, **kwargs):
136 if try_non_lazy:
137 # Why not try this unconditionally? Because we'd end up with very
138 # deep nesting of recursive strategies - better to be lazy unless we
139 # *know* that eager evaluation is the right choice.
140 try:
141 return strategy_definition(*args, **kwargs)
142 except Exception:
143 # If invoking the strategy definition raises an exception,
144 # wrap that up in a LazyStrategy so it happens again later.
145 pass
146 result = LazyStrategy(strategy_definition, args, kwargs)
147 if force_reusable_values:
148 # Setting `force_has_reusable_values` here causes the recursive
149 # property code to set `.has_reusable_values == True`.
150 result.force_has_reusable_values = True
151 assert result.has_reusable_values
152 return result
153
154 accept.is_hypothesis_strategy_function = True
155 return accept
156
157 return decorator
158
159
160def to_jsonable(obj: object, *, avoid_realization: bool) -> object:
161 """Recursively convert an object to json-encodable form.
162
163 This is not intended to round-trip, but rather provide an analysis-ready
164 format for observability. To avoid side affects, we pretty-print all but
165 known types.
166 """
167 if isinstance(obj, (str, int, float, bool, type(None))):
168 if isinstance(obj, int) and not isinstance(obj, bool) and abs(obj) >= 2**63:
169 # Silently clamp very large ints to max_float, to avoid OverflowError when
170 # casting to float. (but avoid adding more constraints to symbolic values)
171 if avoid_realization:
172 return "<symbolic>"
173 obj = clamp(-sys.float_info.max, obj, sys.float_info.max)
174 return float(obj)
175 return obj
176 if avoid_realization:
177 return "<symbolic>"
178 recur = partial(to_jsonable, avoid_realization=avoid_realization)
179 if isinstance(obj, (list, tuple, set, frozenset)):
180 if isinstance(obj, tuple) and hasattr(obj, "_asdict"):
181 return recur(obj._asdict()) # treat namedtuples as dicts
182 return [recur(x) for x in obj]
183 if isinstance(obj, dict):
184 return {
185 k if isinstance(k, str) else pretty(k): recur(v) for k, v in obj.items()
186 }
187
188 # Hey, might as well try calling a .to_json() method - it works for Pandas!
189 # We try this before the below general-purpose handlers to give folks a
190 # chance to control this behavior on their custom classes.
191 try:
192 return recur(obj.to_json()) # type: ignore
193 except Exception:
194 pass
195
196 # Special handling for dataclasses, attrs, and pydantic classes
197 if (
198 (dcs := sys.modules.get("dataclasses"))
199 and dcs.is_dataclass(obj)
200 and not isinstance(obj, type)
201 ):
202 return recur(dataclass_asdict(obj))
203 if attr.has(type(obj)):
204 return recur(attr.asdict(obj, recurse=False)) # type: ignore
205 if (pyd := sys.modules.get("pydantic")) and isinstance(obj, pyd.BaseModel):
206 return recur(obj.model_dump())
207
208 # If all else fails, we'll just pretty-print as a string.
209 return pretty(obj)