1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import dataclasses
12import sys
13import threading
14from functools import partial
15from inspect import signature
16from typing import TYPE_CHECKING, Callable, TypeVar
17
18import attr
19
20from hypothesis.internal.cache import LRUReusedCache
21from hypothesis.internal.floats import clamp, float_to_int
22from hypothesis.internal.reflection import proxies
23from hypothesis.vendor.pretty import pretty
24
25if TYPE_CHECKING:
26 from typing import TypeAlias
27
28 from hypothesis.strategies._internal.strategies import SearchStrategy
29
30T = TypeVar("T")
31ValueKey: "TypeAlias" = tuple[type, object]
32# (fn, args, kwargs)
33StrategyCacheKey: "TypeAlias" = tuple[
34 object, tuple[ValueKey, ...], frozenset[tuple[str, ValueKey]]
35]
36
37_strategies: dict[str, Callable[..., "SearchStrategy"]] = {}
38_CACHE = threading.local()
39
40
41def convert_value(v: object) -> ValueKey:
42 if isinstance(v, float):
43 return (float, float_to_int(v))
44 return (type(v), v)
45
46
47def get_cache() -> LRUReusedCache[StrategyCacheKey, object]:
48 try:
49 return _CACHE.STRATEGY_CACHE
50 except AttributeError:
51 _CACHE.STRATEGY_CACHE = LRUReusedCache[StrategyCacheKey, object](1024)
52 return _CACHE.STRATEGY_CACHE
53
54
55def clear_cache() -> None:
56 cache = get_cache()
57 cache.clear()
58
59
60def cacheable(fn: T) -> T:
61 from hypothesis.control import _current_build_context
62 from hypothesis.strategies._internal.strategies import SearchStrategy
63
64 @proxies(fn)
65 def cached_strategy(*args, **kwargs):
66 context = _current_build_context.value
67 if context is not None and context.data.provider.avoid_realization:
68 return fn(*args, **kwargs)
69
70 try:
71 kwargs_cache_key = {(k, convert_value(v)) for k, v in kwargs.items()}
72 except TypeError:
73 return fn(*args, **kwargs)
74 cache_key = (fn, tuple(map(convert_value, args)), frozenset(kwargs_cache_key))
75 cache = get_cache()
76 try:
77 if cache_key in cache:
78 return cache[cache_key]
79 except TypeError:
80 return fn(*args, **kwargs)
81 else:
82 result = fn(*args, **kwargs)
83 if not isinstance(result, SearchStrategy) or result.is_cacheable:
84 result._is_singleton = True
85 cache[cache_key] = result
86 return result
87
88 cached_strategy.__clear_cache = clear_cache # type: ignore
89 return cached_strategy
90
91
92def defines_strategy(
93 *,
94 force_reusable_values: bool = False,
95 try_non_lazy: bool = False,
96 never_lazy: bool = False,
97) -> Callable[[T], T]:
98 """Returns a decorator for strategy functions.
99
100 If ``force_reusable_values`` is True, the returned strategy will be marked
101 with ``.has_reusable_values == True`` even if it uses maps/filters or
102 non-reusable strategies internally. This tells our numpy/pandas strategies
103 that they can implicitly use such strategies as background values.
104
105 If ``try_non_lazy`` is True, attempt to execute the strategy definition
106 function immediately, so that a LazyStrategy is only returned if this
107 raises an exception.
108
109 If ``never_lazy`` is True, the decorator performs no lazy-wrapping at all,
110 and instead returns the original function.
111 """
112
113 def decorator(strategy_definition):
114 """A decorator that registers the function as a strategy and makes it
115 lazily evaluated."""
116 _strategies[strategy_definition.__name__] = signature(strategy_definition)
117
118 if never_lazy:
119 assert not try_non_lazy
120 # We could potentially support never_lazy + force_reusable_values
121 # with a suitable wrapper, but currently there are no callers that
122 # request this combination.
123 assert not force_reusable_values
124 return strategy_definition
125
126 from hypothesis.strategies._internal.lazy import LazyStrategy
127
128 @proxies(strategy_definition)
129 def accept(*args, **kwargs):
130 if try_non_lazy:
131 # Why not try this unconditionally? Because we'd end up with very
132 # deep nesting of recursive strategies - better to be lazy unless we
133 # *know* that eager evaluation is the right choice.
134 try:
135 return strategy_definition(*args, **kwargs)
136 except Exception:
137 # If invoking the strategy definition raises an exception,
138 # wrap that up in a LazyStrategy so it happens again later.
139 pass
140 result = LazyStrategy(strategy_definition, args, kwargs)
141 if force_reusable_values:
142 # Setting `force_has_reusable_values` here causes the recursive
143 # property code to set `.has_reusable_values == True`.
144 result.force_has_reusable_values = True
145 assert result.has_reusable_values
146 return result
147
148 accept.is_hypothesis_strategy_function = True
149 return accept
150
151 return decorator
152
153
154def to_jsonable(obj: object, *, avoid_realization: bool) -> object:
155 """Recursively convert an object to json-encodable form.
156
157 This is not intended to round-trip, but rather provide an analysis-ready
158 format for observability. To avoid side affects, we pretty-print all but
159 known types.
160 """
161 if isinstance(obj, (str, int, float, bool, type(None))):
162 # We convert integers of 2**63 to floats, to avoid crashing external
163 # utilities with a 64 bit integer cap (notable, sqlite). See
164 # https://github.com/HypothesisWorks/hypothesis/pull/3797#discussion_r1413425110
165 # and https://github.com/simonw/sqlite-utils/issues/605.
166 if isinstance(obj, int) and not isinstance(obj, bool) and abs(obj) >= 2**63:
167 # Silently clamp very large ints to max_float, to avoid OverflowError when
168 # casting to float. (but avoid adding more constraints to symbolic values)
169 if avoid_realization:
170 return "<symbolic>"
171 obj = clamp(-sys.float_info.max, obj, sys.float_info.max)
172 return float(obj)
173 return obj
174 if avoid_realization:
175 return "<symbolic>"
176
177 recur = partial(to_jsonable, avoid_realization=avoid_realization)
178 if isinstance(obj, (list, tuple, set, frozenset)):
179 if isinstance(obj, tuple) and hasattr(obj, "_asdict"):
180 return recur(obj._asdict()) # treat namedtuples as dicts
181 return [recur(x) for x in obj]
182 if isinstance(obj, dict):
183 return {
184 k if isinstance(k, str) else pretty(k): recur(v) for k, v in obj.items()
185 }
186
187 # Hey, might as well try calling a .to_json() method - it works for Pandas!
188 # We try this before the below general-purpose handlers to give folks a
189 # chance to control this behavior on their custom classes.
190 try:
191 return recur(obj.to_json()) # type: ignore
192 except Exception:
193 pass
194
195 # Special handling for dataclasses, attrs, and pydantic classes
196 if (
197 (dcs := sys.modules.get("dataclasses"))
198 and dcs.is_dataclass(obj)
199 and not isinstance(obj, type)
200 ):
201 # Avoid dataclasses.asdict here to ensure that inner to_json overrides
202 # can get called as well
203 return {
204 field.name: recur(getattr(obj, field.name))
205 for field in dataclasses.fields(obj) # type: ignore
206 }
207 if attr.has(type(obj)):
208 return recur(attr.asdict(obj, recurse=False)) # type: ignore
209 if (pyd := sys.modules.get("pydantic")) and isinstance(obj, pyd.BaseModel):
210 return recur(obj.model_dump())
211
212 # If all else fails, we'll just pretty-print as a string.
213 return pretty(obj)