1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import enum
12import hashlib
13import heapq
14import math
15import sys
16from collections import OrderedDict, abc
17from collections.abc import Sequence
18from functools import lru_cache
19from typing import TYPE_CHECKING, Optional, TypeVar, Union
20
21from hypothesis.errors import InvalidArgument
22from hypothesis.internal.compat import int_from_bytes
23from hypothesis.internal.floats import next_up
24
25if TYPE_CHECKING:
26 from hypothesis.internal.conjecture.data import ConjectureData
27
28
29LABEL_MASK = 2**64 - 1
30
31
32def calc_label_from_name(name: str) -> int:
33 hashed = hashlib.sha384(name.encode()).digest()
34 return int_from_bytes(hashed[:8])
35
36
37def calc_label_from_cls(cls: type) -> int:
38 return calc_label_from_name(cls.__qualname__)
39
40
41def calc_label_from_hash(obj: object) -> int:
42 return calc_label_from_name(str(hash(obj)))
43
44
45def combine_labels(*labels: int) -> int:
46 label = 0
47 for l in labels:
48 label = (label << 1) & LABEL_MASK
49 label ^= l
50 return label
51
52
53SAMPLE_IN_SAMPLER_LABEL = calc_label_from_name("a sample() in Sampler")
54ONE_FROM_MANY_LABEL = calc_label_from_name("one more from many()")
55
56
57T = TypeVar("T")
58
59
60def identity(v: T) -> T:
61 return v
62
63
64def check_sample(
65 values: Union[type[enum.Enum], Sequence[T]], strategy_name: str
66) -> Sequence[T]:
67 if "numpy" in sys.modules and isinstance(values, sys.modules["numpy"].ndarray):
68 if values.ndim != 1:
69 raise InvalidArgument(
70 "Only one-dimensional arrays are supported for sampling, "
71 f"and the given value has {values.ndim} dimensions (shape "
72 f"{values.shape}). This array would give samples of array slices "
73 "instead of elements! Use np.ravel(values) to convert "
74 "to a one-dimensional array, or tuple(values) if you "
75 "want to sample slices."
76 )
77 elif not isinstance(values, (OrderedDict, abc.Sequence, enum.EnumMeta)):
78 raise InvalidArgument(
79 f"Cannot sample from {values!r} because it is not an ordered collection. "
80 f"Hypothesis goes to some length to ensure that the {strategy_name} "
81 "strategy has stable results between runs. To replay a saved "
82 "example, the sampled values must have the same iteration order "
83 "on every run - ruling out sets, dicts, etc due to hash "
84 "randomization. Most cases can simply use `sorted(values)`, but "
85 "mixed types or special values such as math.nan require careful "
86 "handling - and note that when simplifying an example, "
87 "Hypothesis treats earlier values as simpler."
88 )
89 if isinstance(values, range):
90 # Pyright is unhappy with every way I've tried to type-annotate this
91 # function, so fine, we'll just ignore the analysis error.
92 return values # type: ignore
93 return tuple(values)
94
95
96@lru_cache(64)
97def compute_sampler_table(weights: tuple[float, ...]) -> list[tuple[int, int, float]]:
98 n = len(weights)
99 table: list[list[int | float | None]] = [[i, None, None] for i in range(n)]
100 total = sum(weights)
101 num_type = type(total)
102
103 zero = num_type(0) # type: ignore
104 one = num_type(1) # type: ignore
105
106 small: list[int] = []
107 large: list[int] = []
108
109 probabilities = [w / total for w in weights]
110 scaled_probabilities: list[float] = []
111
112 for i, alternate_chance in enumerate(probabilities):
113 scaled = alternate_chance * n
114 scaled_probabilities.append(scaled)
115 if scaled == 1:
116 table[i][2] = zero
117 elif scaled < 1:
118 small.append(i)
119 else:
120 large.append(i)
121 heapq.heapify(small)
122 heapq.heapify(large)
123
124 while small and large:
125 lo = heapq.heappop(small)
126 hi = heapq.heappop(large)
127
128 assert lo != hi
129 assert scaled_probabilities[hi] > one
130 assert table[lo][1] is None
131 table[lo][1] = hi
132 table[lo][2] = one - scaled_probabilities[lo]
133 scaled_probabilities[hi] = (
134 scaled_probabilities[hi] + scaled_probabilities[lo]
135 ) - one
136
137 if scaled_probabilities[hi] < 1:
138 heapq.heappush(small, hi)
139 elif scaled_probabilities[hi] == 1:
140 table[hi][2] = zero
141 else:
142 heapq.heappush(large, hi)
143 while large:
144 table[large.pop()][2] = zero
145 while small:
146 table[small.pop()][2] = zero
147
148 new_table: list[tuple[int, int, float]] = []
149 for base, alternate, alternate_chance in table:
150 assert isinstance(base, int)
151 assert isinstance(alternate, int) or alternate is None
152 assert alternate_chance is not None
153 if alternate is None:
154 new_table.append((base, base, alternate_chance))
155 elif alternate < base:
156 new_table.append((alternate, base, one - alternate_chance))
157 else:
158 new_table.append((base, alternate, alternate_chance))
159 new_table.sort()
160 return new_table
161
162
163class Sampler:
164 """Sampler based on Vose's algorithm for the alias method. See
165 http://www.keithschwarz.com/darts-dice-coins/ for a good explanation.
166
167 The general idea is that we store a table of triples (base, alternate, p).
168 base. We then pick a triple uniformly at random, and choose its alternate
169 value with probability p and else choose its base value. The triples are
170 chosen so that the resulting mixture has the right distribution.
171
172 We maintain the following invariants to try to produce good shrinks:
173
174 1. The table is in lexicographic (base, alternate) order, so that choosing
175 an earlier value in the list always lowers (or at least leaves
176 unchanged) the value.
177 2. base[i] < alternate[i], so that shrinking the draw always results in
178 shrinking the chosen element.
179 """
180
181 table: list[tuple[int, int, float]] # (base_idx, alt_idx, alt_chance)
182
183 def __init__(self, weights: Sequence[float], *, observe: bool = True):
184 self.observe = observe
185 self.table = compute_sampler_table(tuple(weights))
186
187 def sample(
188 self,
189 data: "ConjectureData",
190 *,
191 forced: Optional[int] = None,
192 ) -> int:
193 if self.observe:
194 data.start_span(SAMPLE_IN_SAMPLER_LABEL)
195 forced_choice = ( # pragma: no branch # https://github.com/nedbat/coveragepy/issues/1617
196 None
197 if forced is None
198 else next(
199 (base, alternate, alternate_chance)
200 for (base, alternate, alternate_chance) in self.table
201 if forced == base or (forced == alternate and alternate_chance > 0)
202 )
203 )
204 base, alternate, alternate_chance = data.choice(
205 self.table,
206 forced=forced_choice,
207 observe=self.observe,
208 )
209 forced_use_alternate = None
210 if forced is not None:
211 # we maintain this invariant when picking forced_choice above.
212 # This song and dance about alternate_chance > 0 is to avoid forcing
213 # e.g. draw_boolean(p=0, forced=True), which is an error.
214 forced_use_alternate = forced == alternate and alternate_chance > 0
215 assert forced == base or forced_use_alternate
216
217 use_alternate = data.draw_boolean(
218 alternate_chance,
219 forced=forced_use_alternate,
220 observe=self.observe,
221 )
222 if self.observe:
223 data.stop_span()
224 if use_alternate:
225 assert forced is None or alternate == forced, (forced, alternate)
226 return alternate
227 else:
228 assert forced is None or base == forced, (forced, base)
229 return base
230
231
232INT_SIZES = (8, 16, 32, 64, 128)
233INT_SIZES_SAMPLER = Sampler((4.0, 8.0, 1.0, 1.0, 0.5), observe=False)
234
235
236class many:
237 """Utility class for collections. Bundles up the logic we use for "should I
238 keep drawing more values?" and handles starting and stopping examples in
239 the right place.
240
241 Intended usage is something like:
242
243 elements = many(data, ...)
244 while elements.more():
245 add_stuff_to_result()
246 """
247
248 def __init__(
249 self,
250 data: "ConjectureData",
251 min_size: int,
252 max_size: Union[int, float],
253 average_size: Union[int, float],
254 *,
255 forced: Optional[int] = None,
256 observe: bool = True,
257 ) -> None:
258 assert 0 <= min_size <= average_size <= max_size
259 assert forced is None or min_size <= forced <= max_size
260 self.min_size = min_size
261 self.max_size = max_size
262 self.data = data
263 self.forced_size = forced
264 self.p_continue = _calc_p_continue(average_size - min_size, max_size - min_size)
265 self.count = 0
266 self.rejections = 0
267 self.drawn = False
268 self.force_stop = False
269 self.rejected = False
270 self.observe = observe
271
272 def stop_span(self):
273 if self.observe:
274 self.data.stop_span()
275
276 def start_span(self, label):
277 if self.observe:
278 self.data.start_span(label)
279
280 def more(self) -> bool:
281 """Should I draw another element to add to the collection?"""
282 if self.drawn:
283 self.stop_span()
284
285 self.drawn = True
286 self.rejected = False
287
288 self.start_span(ONE_FROM_MANY_LABEL)
289 if self.min_size == self.max_size:
290 # if we have to hit an exact size, draw unconditionally until that
291 # point, and no further.
292 should_continue = self.count < self.min_size
293 else:
294 forced_result = None
295 if self.force_stop:
296 # if our size is forced, we can't reject in a way that would
297 # cause us to differ from the forced size.
298 assert self.forced_size is None or self.count == self.forced_size
299 forced_result = False
300 elif self.count < self.min_size:
301 forced_result = True
302 elif self.count >= self.max_size:
303 forced_result = False
304 elif self.forced_size is not None:
305 forced_result = self.count < self.forced_size
306 should_continue = self.data.draw_boolean(
307 self.p_continue,
308 forced=forced_result,
309 observe=self.observe,
310 )
311
312 if should_continue:
313 self.count += 1
314 return True
315 else:
316 self.stop_span()
317 return False
318
319 def reject(self, why: Optional[str] = None) -> None:
320 """Reject the last example (i.e. don't count it towards our budget of
321 elements because it's not going to go in the final collection)."""
322 assert self.count > 0
323 self.count -= 1
324 self.rejections += 1
325 self.rejected = True
326 # We set a minimum number of rejections before we give up to avoid
327 # failing too fast when we reject the first draw.
328 if self.rejections > max(3, 2 * self.count):
329 if self.count < self.min_size:
330 self.data.mark_invalid(why)
331 else:
332 self.force_stop = True
333
334
335SMALLEST_POSITIVE_FLOAT: float = next_up(0.0) or sys.float_info.min
336
337
338@lru_cache
339def _calc_p_continue(desired_avg: float, max_size: Union[int, float]) -> float:
340 """Return the p_continue which will generate the desired average size."""
341 assert desired_avg <= max_size, (desired_avg, max_size)
342 if desired_avg == max_size:
343 return 1.0
344 p_continue = 1 - 1.0 / (1 + desired_avg)
345 if p_continue == 0 or max_size == math.inf:
346 assert 0 <= p_continue < 1, p_continue
347 return p_continue
348 assert 0 < p_continue < 1, p_continue
349 # For small max_size, the infinite-series p_continue is a poor approximation,
350 # and while we can't solve the polynomial a few rounds of iteration quickly
351 # gets us a good approximate solution in almost all cases (sometimes exact!).
352 while _p_continue_to_avg(p_continue, max_size) > desired_avg:
353 # This is impossible over the reals, but *can* happen with floats.
354 p_continue -= 0.0001
355 # If we've reached zero or gone negative, we want to break out of this loop,
356 # and do so even if we're on a system with the unsafe denormals-are-zero flag.
357 # We make that an explicit error in st.floats(), but here we'd prefer to
358 # just get somewhat worse precision on collection lengths.
359 if p_continue < SMALLEST_POSITIVE_FLOAT:
360 p_continue = SMALLEST_POSITIVE_FLOAT
361 break
362 # Let's binary-search our way to a better estimate! We tried fancier options
363 # like gradient descent, but this is numerically stable and works better.
364 hi = 1.0
365 while desired_avg - _p_continue_to_avg(p_continue, max_size) > 0.01:
366 assert 0 < p_continue < hi, (p_continue, hi)
367 mid = (p_continue + hi) / 2
368 if _p_continue_to_avg(mid, max_size) <= desired_avg:
369 p_continue = mid
370 else:
371 hi = mid
372 assert 0 < p_continue < 1, p_continue
373 assert _p_continue_to_avg(p_continue, max_size) <= desired_avg
374 return p_continue
375
376
377def _p_continue_to_avg(p_continue: float, max_size: Union[int, float]) -> float:
378 """Return the average_size generated by this p_continue and max_size."""
379 if p_continue >= 1:
380 return max_size
381 return (1.0 / (1 - p_continue) - 1) * (1 - p_continue**max_size)