1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import sys
12import threading
13from inspect import signature
14from typing import TYPE_CHECKING, Callable, Dict
15
16import attr
17
18from hypothesis.internal.cache import LRUReusedCache
19from hypothesis.internal.compat import dataclass_asdict
20from hypothesis.internal.conjecture.junkdrawer import clamp
21from hypothesis.internal.floats import float_to_int
22from hypothesis.internal.reflection import proxies
23from hypothesis.vendor.pretty import pretty
24
25if TYPE_CHECKING:
26 from hypothesis.strategies._internal.strategies import SearchStrategy, T
27
28_strategies: Dict[str, Callable[..., "SearchStrategy"]] = {}
29
30
31class FloatKey:
32 def __init__(self, f):
33 self.value = float_to_int(f)
34
35 def __eq__(self, other):
36 return isinstance(other, FloatKey) and (other.value == self.value)
37
38 def __ne__(self, other):
39 return not self.__eq__(other)
40
41 def __hash__(self):
42 return hash(self.value)
43
44
45def convert_value(v):
46 if isinstance(v, float):
47 return FloatKey(v)
48 return (type(v), v)
49
50
51_CACHE = threading.local()
52
53
54def get_cache() -> LRUReusedCache:
55 try:
56 return _CACHE.STRATEGY_CACHE
57 except AttributeError:
58 _CACHE.STRATEGY_CACHE = LRUReusedCache(1024)
59 return _CACHE.STRATEGY_CACHE
60
61
62def clear_cache() -> None:
63 cache = get_cache()
64 cache.clear()
65
66
67def cacheable(fn: "T") -> "T":
68 from hypothesis.strategies._internal.strategies import SearchStrategy
69
70 @proxies(fn)
71 def cached_strategy(*args, **kwargs):
72 try:
73 kwargs_cache_key = {(k, convert_value(v)) for k, v in kwargs.items()}
74 except TypeError:
75 return fn(*args, **kwargs)
76 cache_key = (fn, tuple(map(convert_value, args)), frozenset(kwargs_cache_key))
77 cache = get_cache()
78 try:
79 if cache_key in cache:
80 return cache[cache_key]
81 except TypeError:
82 return fn(*args, **kwargs)
83 else:
84 result = fn(*args, **kwargs)
85 if not isinstance(result, SearchStrategy) or result.is_cacheable:
86 cache[cache_key] = result
87 return result
88
89 cached_strategy.__clear_cache = clear_cache # type: ignore
90 return cached_strategy
91
92
93def defines_strategy(
94 *,
95 force_reusable_values: bool = False,
96 try_non_lazy: bool = False,
97 never_lazy: bool = False,
98) -> Callable[["T"], "T"]:
99 """Returns a decorator for strategy functions.
100
101 If ``force_reusable_values`` is True, the returned strategy will be marked
102 with ``.has_reusable_values == True`` even if it uses maps/filters or
103 non-reusable strategies internally. This tells our numpy/pandas strategies
104 that they can implicitly use such strategies as background values.
105
106 If ``try_non_lazy`` is True, attempt to execute the strategy definition
107 function immediately, so that a LazyStrategy is only returned if this
108 raises an exception.
109
110 If ``never_lazy`` is True, the decorator performs no lazy-wrapping at all,
111 and instead returns the original function.
112 """
113
114 def decorator(strategy_definition):
115 """A decorator that registers the function as a strategy and makes it
116 lazily evaluated."""
117 _strategies[strategy_definition.__name__] = signature(strategy_definition)
118
119 if never_lazy:
120 assert not try_non_lazy
121 # We could potentially support never_lazy + force_reusable_values
122 # with a suitable wrapper, but currently there are no callers that
123 # request this combination.
124 assert not force_reusable_values
125 return strategy_definition
126
127 from hypothesis.strategies._internal.lazy import LazyStrategy
128
129 @proxies(strategy_definition)
130 def accept(*args, **kwargs):
131 if try_non_lazy:
132 # Why not try this unconditionally? Because we'd end up with very
133 # deep nesting of recursive strategies - better to be lazy unless we
134 # *know* that eager evaluation is the right choice.
135 try:
136 return strategy_definition(*args, **kwargs)
137 except Exception:
138 # If invoking the strategy definition raises an exception,
139 # wrap that up in a LazyStrategy so it happens again later.
140 pass
141 result = LazyStrategy(strategy_definition, args, kwargs)
142 if force_reusable_values:
143 # Setting `force_has_reusable_values` here causes the recursive
144 # property code to set `.has_reusable_values == True`.
145 result.force_has_reusable_values = True
146 assert result.has_reusable_values
147 return result
148
149 accept.is_hypothesis_strategy_function = True
150 return accept
151
152 return decorator
153
154
155def to_jsonable(obj: object) -> object:
156 """Recursively convert an object to json-encodable form.
157
158 This is not intended to round-trip, but rather provide an analysis-ready
159 format for observability. To avoid side affects, we pretty-print all but
160 known types.
161 """
162 if isinstance(obj, (str, int, float, bool, type(None))):
163 if isinstance(obj, int) and abs(obj) >= 2**63:
164 # Silently clamp very large ints to max_float, to avoid
165 # OverflowError when casting to float.
166 obj = clamp(-sys.float_info.max, obj, sys.float_info.max)
167 return float(obj)
168 return obj
169 if isinstance(obj, (list, tuple, set, frozenset)):
170 if isinstance(obj, tuple) and hasattr(obj, "_asdict"):
171 return to_jsonable(obj._asdict()) # treat namedtuples as dicts
172 return [to_jsonable(x) for x in obj]
173 if isinstance(obj, dict):
174 return {
175 k if isinstance(k, str) else pretty(k): to_jsonable(v)
176 for k, v in obj.items()
177 }
178
179 # Hey, might as well try calling a .to_json() method - it works for Pandas!
180 # We try this before the below general-purpose handlers to give folks a
181 # chance to control this behavior on their custom classes.
182 try:
183 return to_jsonable(obj.to_json()) # type: ignore
184 except Exception:
185 pass
186
187 # Special handling for dataclasses, attrs, and pydantic classes
188 if (
189 (dcs := sys.modules.get("dataclasses"))
190 and dcs.is_dataclass(obj)
191 and not isinstance(obj, type)
192 ):
193 return to_jsonable(dataclass_asdict(obj))
194 if attr.has(type(obj)):
195 return to_jsonable(attr.asdict(obj, recurse=False)) # type: ignore
196 if (pyd := sys.modules.get("pydantic")) and isinstance(obj, pyd.BaseModel):
197 return to_jsonable(obj.model_dump())
198
199 # If all else fails, we'll just pretty-print as a string.
200 return pretty(obj)