1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import dataclasses
12import sys
13from functools import partial
14from inspect import signature
15from typing import TYPE_CHECKING, Callable, TypeVar
16
17import attr
18
19from hypothesis.internal.cache import LRUReusedCache
20from hypothesis.internal.floats import clamp, float_to_int
21from hypothesis.internal.reflection import proxies
22from hypothesis.vendor.pretty import pretty
23
24if TYPE_CHECKING:
25 from typing import TypeAlias
26
27 from hypothesis.strategies._internal.strategies import SearchStrategy
28
29T = TypeVar("T")
30ValueKey: "TypeAlias" = tuple[type, object]
31# (fn, args, kwargs)
32StrategyCacheKey: "TypeAlias" = tuple[
33 object, tuple[ValueKey, ...], frozenset[tuple[str, ValueKey]]
34]
35
36_strategies: dict[str, Callable[..., "SearchStrategy"]] = {}
37# note: LRUReusedCache is already thread-local internally
38_STRATEGY_CACHE = LRUReusedCache[StrategyCacheKey, object](1024)
39
40
41def convert_value(v: object) -> ValueKey:
42 if isinstance(v, float):
43 return (float, float_to_int(v))
44 return (type(v), v)
45
46
47def clear_cache() -> None:
48 _STRATEGY_CACHE.clear()
49
50
51def cacheable(fn: T) -> T:
52 from hypothesis.control import _current_build_context
53 from hypothesis.strategies._internal.strategies import SearchStrategy
54
55 @proxies(fn)
56 def cached_strategy(*args, **kwargs):
57 context = _current_build_context.value
58 if context is not None and context.data.provider.avoid_realization:
59 return fn(*args, **kwargs)
60
61 try:
62 kwargs_cache_key = {(k, convert_value(v)) for k, v in kwargs.items()}
63 except TypeError:
64 return fn(*args, **kwargs)
65 cache_key = (fn, tuple(map(convert_value, args)), frozenset(kwargs_cache_key))
66 try:
67 if cache_key in _STRATEGY_CACHE:
68 return _STRATEGY_CACHE[cache_key]
69 except TypeError:
70 return fn(*args, **kwargs)
71 else:
72 result = fn(*args, **kwargs)
73 if not isinstance(result, SearchStrategy) or result.is_cacheable:
74 result._is_singleton = True
75 _STRATEGY_CACHE[cache_key] = result
76 return result
77
78 cached_strategy.__clear_cache = clear_cache # type: ignore
79 return cached_strategy
80
81
82def defines_strategy(
83 *,
84 force_reusable_values: bool = False,
85 try_non_lazy: bool = False,
86 never_lazy: bool = False,
87) -> Callable[[T], T]:
88 """Returns a decorator for strategy functions.
89
90 If ``force_reusable_values`` is True, the returned strategy will be marked
91 with ``.has_reusable_values == True`` even if it uses maps/filters or
92 non-reusable strategies internally. This tells our numpy/pandas strategies
93 that they can implicitly use such strategies as background values.
94
95 If ``try_non_lazy`` is True, attempt to execute the strategy definition
96 function immediately, so that a LazyStrategy is only returned if this
97 raises an exception.
98
99 If ``never_lazy`` is True, the decorator performs no lazy-wrapping at all,
100 and instead returns the original function.
101 """
102
103 def decorator(strategy_definition):
104 """A decorator that registers the function as a strategy and makes it
105 lazily evaluated."""
106 _strategies[strategy_definition.__name__] = signature(strategy_definition)
107
108 if never_lazy:
109 assert not try_non_lazy
110 # We could potentially support never_lazy + force_reusable_values
111 # with a suitable wrapper, but currently there are no callers that
112 # request this combination.
113 assert not force_reusable_values
114 return strategy_definition
115
116 from hypothesis.strategies._internal.lazy import LazyStrategy
117
118 @proxies(strategy_definition)
119 def accept(*args, **kwargs):
120 if try_non_lazy:
121 # Why not try this unconditionally? Because we'd end up with very
122 # deep nesting of recursive strategies - better to be lazy unless we
123 # *know* that eager evaluation is the right choice.
124 try:
125 return strategy_definition(*args, **kwargs)
126 except Exception:
127 # If invoking the strategy definition raises an exception,
128 # wrap that up in a LazyStrategy so it happens again later.
129 pass
130 result = LazyStrategy(strategy_definition, args, kwargs)
131 if force_reusable_values:
132 # Setting `force_has_reusable_values` here causes the recursive
133 # property code to set `.has_reusable_values == True`.
134 result.force_has_reusable_values = True
135 assert result.has_reusable_values
136 return result
137
138 accept.is_hypothesis_strategy_function = True
139 return accept
140
141 return decorator
142
143
144def to_jsonable(obj: object, *, avoid_realization: bool) -> object:
145 """Recursively convert an object to json-encodable form.
146
147 This is not intended to round-trip, but rather provide an analysis-ready
148 format for observability. To avoid side affects, we pretty-print all but
149 known types.
150 """
151 if isinstance(obj, (str, int, float, bool, type(None))):
152 # We convert integers of 2**63 to floats, to avoid crashing external
153 # utilities with a 64 bit integer cap (notable, sqlite). See
154 # https://github.com/HypothesisWorks/hypothesis/pull/3797#discussion_r1413425110
155 # and https://github.com/simonw/sqlite-utils/issues/605.
156 if isinstance(obj, int) and not isinstance(obj, bool) and abs(obj) >= 2**63:
157 # Silently clamp very large ints to max_float, to avoid OverflowError when
158 # casting to float. (but avoid adding more constraints to symbolic values)
159 if avoid_realization:
160 return "<symbolic>"
161 obj = clamp(-sys.float_info.max, obj, sys.float_info.max)
162 return float(obj)
163 return obj
164 if avoid_realization:
165 return "<symbolic>"
166
167 recur = partial(to_jsonable, avoid_realization=avoid_realization)
168 if isinstance(obj, (list, tuple, set, frozenset)):
169 if isinstance(obj, tuple) and hasattr(obj, "_asdict"):
170 return recur(obj._asdict()) # treat namedtuples as dicts
171 return [recur(x) for x in obj]
172 if isinstance(obj, dict):
173 return {
174 k if isinstance(k, str) else pretty(k): recur(v) for k, v in obj.items()
175 }
176
177 # Hey, might as well try calling a .to_json() method - it works for Pandas!
178 # We try this before the below general-purpose handlers to give folks a
179 # chance to control this behavior on their custom classes.
180 try:
181 return recur(obj.to_json()) # type: ignore
182 except Exception:
183 pass
184
185 # Special handling for dataclasses, attrs, and pydantic classes
186 if (
187 (dcs := sys.modules.get("dataclasses"))
188 and dcs.is_dataclass(obj)
189 and not isinstance(obj, type)
190 ):
191 # Avoid dataclasses.asdict here to ensure that inner to_json overrides
192 # can get called as well
193 return {
194 field.name: recur(getattr(obj, field.name))
195 for field in dataclasses.fields(obj) # type: ignore
196 }
197 if attr.has(type(obj)):
198 return recur(attr.asdict(obj, recurse=False)) # type: ignore
199 if (pyd := sys.modules.get("pydantic")) and isinstance(obj, pyd.BaseModel):
200 return recur(obj.model_dump())
201
202 # If all else fails, we'll just pretty-print as a string.
203 return pretty(obj)