1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import dataclasses
12import sys
13import threading
14from functools import partial
15from inspect import signature
16from typing import TYPE_CHECKING, Callable
17
18import attr
19
20from hypothesis.internal.cache import LRUReusedCache
21from hypothesis.internal.floats import clamp, float_to_int
22from hypothesis.internal.reflection import proxies
23from hypothesis.vendor.pretty import pretty
24
25if TYPE_CHECKING:
26 from hypothesis.strategies._internal.strategies import SearchStrategy, T
27
28_strategies: dict[str, Callable[..., "SearchStrategy"]] = {}
29
30
31class FloatKey:
32 def __init__(self, f):
33 self.value = float_to_int(f)
34
35 def __eq__(self, other):
36 return isinstance(other, FloatKey) and (other.value == self.value)
37
38 def __ne__(self, other):
39 return not self.__eq__(other)
40
41 def __hash__(self):
42 return hash(self.value)
43
44
45def convert_value(v):
46 if isinstance(v, float):
47 return FloatKey(v)
48 return (type(v), v)
49
50
51_CACHE = threading.local()
52
53
54def get_cache() -> LRUReusedCache:
55 try:
56 return _CACHE.STRATEGY_CACHE
57 except AttributeError:
58 _CACHE.STRATEGY_CACHE = LRUReusedCache(1024)
59 return _CACHE.STRATEGY_CACHE
60
61
62def clear_cache() -> None:
63 cache = get_cache()
64 cache.clear()
65
66
67def cacheable(fn: "T") -> "T":
68 from hypothesis.control import _current_build_context
69 from hypothesis.strategies._internal.strategies import SearchStrategy
70
71 @proxies(fn)
72 def cached_strategy(*args, **kwargs):
73 context = _current_build_context.value
74 if context is not None and context.data.provider.avoid_realization:
75 return fn(*args, **kwargs)
76
77 try:
78 kwargs_cache_key = {(k, convert_value(v)) for k, v in kwargs.items()}
79 except TypeError:
80 return fn(*args, **kwargs)
81 cache_key = (fn, tuple(map(convert_value, args)), frozenset(kwargs_cache_key))
82 cache = get_cache()
83 try:
84 if cache_key in cache:
85 return cache[cache_key]
86 except TypeError:
87 return fn(*args, **kwargs)
88 else:
89 result = fn(*args, **kwargs)
90 if not isinstance(result, SearchStrategy) or result.is_cacheable:
91 result._is_singleton = True
92 cache[cache_key] = result
93 return result
94
95 cached_strategy.__clear_cache = clear_cache # type: ignore
96 return cached_strategy
97
98
99def defines_strategy(
100 *,
101 force_reusable_values: bool = False,
102 try_non_lazy: bool = False,
103 never_lazy: bool = False,
104) -> Callable[["T"], "T"]:
105 """Returns a decorator for strategy functions.
106
107 If ``force_reusable_values`` is True, the returned strategy will be marked
108 with ``.has_reusable_values == True`` even if it uses maps/filters or
109 non-reusable strategies internally. This tells our numpy/pandas strategies
110 that they can implicitly use such strategies as background values.
111
112 If ``try_non_lazy`` is True, attempt to execute the strategy definition
113 function immediately, so that a LazyStrategy is only returned if this
114 raises an exception.
115
116 If ``never_lazy`` is True, the decorator performs no lazy-wrapping at all,
117 and instead returns the original function.
118 """
119
120 def decorator(strategy_definition):
121 """A decorator that registers the function as a strategy and makes it
122 lazily evaluated."""
123 _strategies[strategy_definition.__name__] = signature(strategy_definition)
124
125 if never_lazy:
126 assert not try_non_lazy
127 # We could potentially support never_lazy + force_reusable_values
128 # with a suitable wrapper, but currently there are no callers that
129 # request this combination.
130 assert not force_reusable_values
131 return strategy_definition
132
133 from hypothesis.strategies._internal.lazy import LazyStrategy
134
135 @proxies(strategy_definition)
136 def accept(*args, **kwargs):
137 if try_non_lazy:
138 # Why not try this unconditionally? Because we'd end up with very
139 # deep nesting of recursive strategies - better to be lazy unless we
140 # *know* that eager evaluation is the right choice.
141 try:
142 return strategy_definition(*args, **kwargs)
143 except Exception:
144 # If invoking the strategy definition raises an exception,
145 # wrap that up in a LazyStrategy so it happens again later.
146 pass
147 result = LazyStrategy(strategy_definition, args, kwargs)
148 if force_reusable_values:
149 # Setting `force_has_reusable_values` here causes the recursive
150 # property code to set `.has_reusable_values == True`.
151 result.force_has_reusable_values = True
152 assert result.has_reusable_values
153 return result
154
155 accept.is_hypothesis_strategy_function = True
156 return accept
157
158 return decorator
159
160
161def to_jsonable(obj: object, *, avoid_realization: bool) -> object:
162 """Recursively convert an object to json-encodable form.
163
164 This is not intended to round-trip, but rather provide an analysis-ready
165 format for observability. To avoid side affects, we pretty-print all but
166 known types.
167 """
168 if isinstance(obj, (str, int, float, bool, type(None))):
169 # We convert integers of 2**63 to floats, to avoid crashing external
170 # utilities with a 64 bit integer cap (notable, sqlite). See
171 # https://github.com/HypothesisWorks/hypothesis/pull/3797#discussion_r1413425110
172 # and https://github.com/simonw/sqlite-utils/issues/605.
173 if isinstance(obj, int) and not isinstance(obj, bool) and abs(obj) >= 2**63:
174 # Silently clamp very large ints to max_float, to avoid OverflowError when
175 # casting to float. (but avoid adding more constraints to symbolic values)
176 if avoid_realization:
177 return "<symbolic>"
178 obj = clamp(-sys.float_info.max, obj, sys.float_info.max)
179 return float(obj)
180 return obj
181 if avoid_realization:
182 return "<symbolic>"
183
184 recur = partial(to_jsonable, avoid_realization=avoid_realization)
185 if isinstance(obj, (list, tuple, set, frozenset)):
186 if isinstance(obj, tuple) and hasattr(obj, "_asdict"):
187 return recur(obj._asdict()) # treat namedtuples as dicts
188 return [recur(x) for x in obj]
189 if isinstance(obj, dict):
190 return {
191 k if isinstance(k, str) else pretty(k): recur(v) for k, v in obj.items()
192 }
193
194 # Hey, might as well try calling a .to_json() method - it works for Pandas!
195 # We try this before the below general-purpose handlers to give folks a
196 # chance to control this behavior on their custom classes.
197 try:
198 return recur(obj.to_json()) # type: ignore
199 except Exception:
200 pass
201
202 # Special handling for dataclasses, attrs, and pydantic classes
203 if (
204 (dcs := sys.modules.get("dataclasses"))
205 and dcs.is_dataclass(obj)
206 and not isinstance(obj, type)
207 ):
208 # Avoid dataclasses.asdict here to ensure that inner to_json overrides
209 # can get called as well
210 return {
211 field.name: recur(getattr(obj, field.name))
212 for field in dataclasses.fields(obj) # type: ignore
213 }
214 if attr.has(type(obj)):
215 return recur(attr.asdict(obj, recurse=False)) # type: ignore
216 if (pyd := sys.modules.get("pydantic")) and isinstance(obj, pyd.BaseModel):
217 return recur(obj.model_dump())
218
219 # If all else fails, we'll just pretty-print as a string.
220 return pretty(obj)