1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import dataclasses
12import sys
13from functools import partial
14from typing import TYPE_CHECKING, Callable, TypeVar
15from weakref import WeakValueDictionary
16
17import attr
18
19from hypothesis.internal.cache import LRUReusedCache
20from hypothesis.internal.floats import clamp, float_to_int
21from hypothesis.internal.reflection import proxies
22from hypothesis.vendor.pretty import pretty
23
24if TYPE_CHECKING:
25 from typing import TypeAlias
26
27T = TypeVar("T")
28ValueKey: "TypeAlias" = tuple[type, object]
29# (fn, args, kwargs)
30StrategyCacheKey: "TypeAlias" = tuple[
31 object, tuple[ValueKey, ...], frozenset[tuple[str, ValueKey]]
32]
33
34_all_strategies: WeakValueDictionary[str, Callable] = WeakValueDictionary()
35# note: LRUReusedCache is already thread-local internally
36_STRATEGY_CACHE = LRUReusedCache[StrategyCacheKey, object](1024)
37
38
39def convert_value(v: object) -> ValueKey:
40 if isinstance(v, float):
41 return (float, float_to_int(v))
42 return (type(v), v)
43
44
45def clear_cache() -> None:
46 _STRATEGY_CACHE.clear()
47
48
49def cacheable(fn: T) -> T:
50 from hypothesis.control import _current_build_context
51 from hypothesis.strategies._internal.strategies import SearchStrategy
52
53 @proxies(fn)
54 def cached_strategy(*args, **kwargs):
55 context = _current_build_context.value
56 if context is not None and context.data.provider.avoid_realization:
57 return fn(*args, **kwargs)
58
59 try:
60 kwargs_cache_key = {(k, convert_value(v)) for k, v in kwargs.items()}
61 except TypeError:
62 return fn(*args, **kwargs)
63 cache_key = (fn, tuple(map(convert_value, args)), frozenset(kwargs_cache_key))
64 try:
65 if cache_key in _STRATEGY_CACHE:
66 return _STRATEGY_CACHE[cache_key]
67 except TypeError:
68 return fn(*args, **kwargs)
69 else:
70 result = fn(*args, **kwargs)
71 if not isinstance(result, SearchStrategy) or result.is_cacheable:
72 _STRATEGY_CACHE[cache_key] = result
73 return result
74
75 cached_strategy.__clear_cache = clear_cache # type: ignore
76 return cached_strategy
77
78
79def defines_strategy(
80 *,
81 force_reusable_values: bool = False,
82 try_non_lazy: bool = False,
83 never_lazy: bool = False,
84) -> Callable[[T], T]:
85 """Returns a decorator for strategy functions.
86
87 If ``force_reusable_values`` is True, the returned strategy will be marked
88 with ``.has_reusable_values == True`` even if it uses maps/filters or
89 non-reusable strategies internally. This tells our numpy/pandas strategies
90 that they can implicitly use such strategies as background values.
91
92 If ``try_non_lazy`` is True, attempt to execute the strategy definition
93 function immediately, so that a LazyStrategy is only returned if this
94 raises an exception.
95
96 If ``never_lazy`` is True, the decorator performs no lazy-wrapping at all,
97 and instead returns the original function.
98 """
99
100 def decorator(strategy_definition):
101 """A decorator that registers the function as a strategy and makes it
102 lazily evaluated."""
103 _all_strategies[strategy_definition.__name__] = strategy_definition
104
105 if never_lazy:
106 assert not try_non_lazy
107 # We could potentially support never_lazy + force_reusable_values
108 # with a suitable wrapper, but currently there are no callers that
109 # request this combination.
110 assert not force_reusable_values
111 return strategy_definition
112
113 from hypothesis.strategies._internal.lazy import LazyStrategy
114
115 @proxies(strategy_definition)
116 def accept(*args, **kwargs):
117 if try_non_lazy:
118 # Why not try this unconditionally? Because we'd end up with very
119 # deep nesting of recursive strategies - better to be lazy unless we
120 # *know* that eager evaluation is the right choice.
121 try:
122 return strategy_definition(*args, **kwargs)
123 except Exception:
124 # If invoking the strategy definition raises an exception,
125 # wrap that up in a LazyStrategy so it happens again later.
126 pass
127 result = LazyStrategy(strategy_definition, args, kwargs)
128 if force_reusable_values:
129 # Setting `force_has_reusable_values` here causes the recursive
130 # property code to set `.has_reusable_values == True`.
131 result.force_has_reusable_values = True
132 assert result.has_reusable_values
133 return result
134
135 accept.is_hypothesis_strategy_function = True
136 return accept
137
138 return decorator
139
140
141def to_jsonable(obj: object, *, avoid_realization: bool) -> object:
142 """Recursively convert an object to json-encodable form.
143
144 This is not intended to round-trip, but rather provide an analysis-ready
145 format for observability. To avoid side affects, we pretty-print all but
146 known types.
147 """
148 if isinstance(obj, (str, int, float, bool, type(None))):
149 # We convert integers of 2**63 to floats, to avoid crashing external
150 # utilities with a 64 bit integer cap (notable, sqlite). See
151 # https://github.com/HypothesisWorks/hypothesis/pull/3797#discussion_r1413425110
152 # and https://github.com/simonw/sqlite-utils/issues/605.
153 if isinstance(obj, int) and not isinstance(obj, bool) and abs(obj) >= 2**63:
154 # Silently clamp very large ints to max_float, to avoid OverflowError when
155 # casting to float. (but avoid adding more constraints to symbolic values)
156 if avoid_realization:
157 return "<symbolic>"
158 obj = clamp(-sys.float_info.max, obj, sys.float_info.max)
159 return float(obj)
160 return obj
161 if avoid_realization:
162 return "<symbolic>"
163
164 recur = partial(to_jsonable, avoid_realization=avoid_realization)
165 if isinstance(obj, (list, tuple, set, frozenset)):
166 if isinstance(obj, tuple) and hasattr(obj, "_asdict"):
167 return recur(obj._asdict()) # treat namedtuples as dicts
168 return [recur(x) for x in obj]
169 if isinstance(obj, dict):
170 return {
171 k if isinstance(k, str) else pretty(k): recur(v) for k, v in obj.items()
172 }
173
174 # Hey, might as well try calling a .to_json() method - it works for Pandas!
175 # We try this before the below general-purpose handlers to give folks a
176 # chance to control this behavior on their custom classes.
177 try:
178 return recur(obj.to_json()) # type: ignore
179 except Exception:
180 pass
181
182 # Special handling for dataclasses, attrs, and pydantic classes
183 if (
184 (dcs := sys.modules.get("dataclasses"))
185 and dcs.is_dataclass(obj)
186 and not isinstance(obj, type)
187 ):
188 # Avoid dataclasses.asdict here to ensure that inner to_json overrides
189 # can get called as well
190 return {
191 field.name: recur(getattr(obj, field.name))
192 for field in dataclasses.fields(obj) # type: ignore
193 }
194 if attr.has(type(obj)):
195 return recur(attr.asdict(obj, recurse=False)) # type: ignore
196 if (pyd := sys.modules.get("pydantic")) and isinstance(obj, pyd.BaseModel):
197 return recur(obj.model_dump())
198
199 # If all else fails, we'll just pretty-print as a string.
200 return pretty(obj)