1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11"""A module for miscellaneous useful bits and bobs that don't
12obviously belong anywhere else. If you spot a better home for
13anything that lives here, please move it."""
14
15import array
16import gc
17import sys
18import time
19import warnings
20from random import Random
21from typing import (
22 Callable,
23 Dict,
24 Generic,
25 Iterable,
26 Iterator,
27 List,
28 Optional,
29 Sequence,
30 Tuple,
31 TypeVar,
32 Union,
33 overload,
34)
35
36from sortedcontainers import SortedList
37
38from hypothesis.errors import HypothesisWarning
39
40ARRAY_CODES = ["B", "H", "I", "L", "Q", "O"]
41
42T = TypeVar("T")
43
44
45def array_or_list(
46 code: str, contents: Iterable[int]
47) -> "Union[List[int], array.ArrayType[int]]":
48 if code == "O":
49 return list(contents)
50 return array.array(code, contents)
51
52
53def replace_all(
54 ls: Sequence[T],
55 replacements: Iterable[Tuple[int, int, Sequence[T]]],
56) -> List[T]:
57 """Substitute multiple replacement values into a list.
58
59 Replacements is a list of (start, end, value) triples.
60 """
61
62 result: List[T] = []
63 prev = 0
64 offset = 0
65 for u, v, r in replacements:
66 result.extend(ls[prev:u])
67 result.extend(r)
68 prev = v
69 offset += len(r) - (v - u)
70 result.extend(ls[prev:])
71 assert len(result) == len(ls) + offset
72 return result
73
74
75NEXT_ARRAY_CODE = dict(zip(ARRAY_CODES, ARRAY_CODES[1:]))
76
77
78class IntList(Sequence[int]):
79 """Class for storing a list of non-negative integers compactly.
80
81 We store them as the smallest size integer array we can get
82 away with. When we try to add an integer that is too large,
83 we upgrade the array to the smallest word size needed to store
84 the new value."""
85
86 __slots__ = ("__underlying",)
87
88 __underlying: "Union[List[int], array.ArrayType[int]]"
89
90 def __init__(self, values: Sequence[int] = ()):
91 for code in ARRAY_CODES:
92 try:
93 underlying = array_or_list(code, values)
94 break
95 except OverflowError:
96 pass
97 else: # pragma: no cover
98 raise AssertionError(f"Could not create storage for {values!r}")
99 if isinstance(underlying, list):
100 for v in underlying:
101 if not isinstance(v, int) or v < 0:
102 raise ValueError(f"Could not create IntList for {values!r}")
103 self.__underlying = underlying
104
105 @classmethod
106 def of_length(cls, n: int) -> "IntList":
107 return cls(array_or_list("B", [0]) * n)
108
109 def count(self, value: int) -> int:
110 return self.__underlying.count(value)
111
112 def __repr__(self):
113 return f"IntList({list(self.__underlying)!r})"
114
115 def __len__(self):
116 return len(self.__underlying)
117
118 @overload
119 def __getitem__(self, i: int) -> int: ... # pragma: no cover
120
121 @overload
122 def __getitem__(self, i: slice) -> "IntList": ... # pragma: no cover
123
124 def __getitem__(self, i: Union[int, slice]) -> "Union[int, IntList]":
125 if isinstance(i, slice):
126 return IntList(self.__underlying[i])
127 return self.__underlying[i]
128
129 def __delitem__(self, i: int) -> None:
130 del self.__underlying[i]
131
132 def insert(self, i: int, v: int) -> None:
133 self.__underlying.insert(i, v)
134
135 def __iter__(self) -> Iterator[int]:
136 return iter(self.__underlying)
137
138 def __eq__(self, other: object) -> bool:
139 if self is other:
140 return True
141 if not isinstance(other, IntList):
142 return NotImplemented
143 return self.__underlying == other.__underlying
144
145 def __ne__(self, other: object) -> bool:
146 if self is other:
147 return False
148 if not isinstance(other, IntList):
149 return NotImplemented
150 return self.__underlying != other.__underlying
151
152 def append(self, n: int) -> None:
153 i = len(self)
154 self.__underlying.append(0)
155 self[i] = n
156
157 def __setitem__(self, i: int, n: int) -> None:
158 while True:
159 try:
160 self.__underlying[i] = n
161 return
162 except OverflowError:
163 assert n > 0
164 self.__upgrade()
165
166 def extend(self, ls: Iterable[int]) -> None:
167 for n in ls:
168 self.append(n)
169
170 def __upgrade(self) -> None:
171 assert isinstance(self.__underlying, array.array)
172 code = NEXT_ARRAY_CODE[self.__underlying.typecode]
173 self.__underlying = array_or_list(code, self.__underlying)
174
175
176def binary_search(lo: int, hi: int, f: Callable[[int], bool]) -> int:
177 """Binary searches in [lo , hi) to find
178 n such that f(n) == f(lo) but f(n + 1) != f(lo).
179 It is implicitly assumed and will not be checked
180 that f(hi) != f(lo).
181 """
182
183 reference = f(lo)
184
185 while lo + 1 < hi:
186 mid = (lo + hi) // 2
187 if f(mid) == reference:
188 lo = mid
189 else:
190 hi = mid
191 return lo
192
193
194def uniform(random: Random, n: int) -> bytes:
195 """Returns a bytestring of length n, distributed uniformly at random."""
196 return random.getrandbits(n * 8).to_bytes(n, "big")
197
198
199class LazySequenceCopy:
200 """A "copy" of a sequence that works by inserting a mask in front
201 of the underlying sequence, so that you can mutate it without changing
202 the underlying sequence. Effectively behaves as if you could do list(x)
203 in O(1) time. The full list API is not supported yet but there's no reason
204 in principle it couldn't be."""
205
206 def __init__(self, values: Sequence[int]):
207 self.__values = values
208 self.__len = len(values)
209 self.__mask: Optional[Dict[int, int]] = None
210 self.__popped_indices: Optional[SortedList] = None
211
212 def __len__(self) -> int:
213 if self.__popped_indices is None:
214 return self.__len
215 return self.__len - len(self.__popped_indices)
216
217 def pop(self, i: int = -1) -> int:
218 if len(self) == 0:
219 raise IndexError("Cannot pop from empty list")
220 i = self.__underlying_index(i)
221
222 v = None
223 if self.__mask is not None:
224 v = self.__mask.pop(i, None)
225 if v is None:
226 v = self.__values[i]
227
228 if self.__popped_indices is None:
229 self.__popped_indices = SortedList()
230 self.__popped_indices.add(i)
231 return v
232
233 def __getitem__(self, i: int) -> int:
234 i = self.__underlying_index(i)
235
236 default = self.__values[i]
237 if self.__mask is None:
238 return default
239 else:
240 return self.__mask.get(i, default)
241
242 def __setitem__(self, i: int, v: int) -> None:
243 i = self.__underlying_index(i)
244 if self.__mask is None:
245 self.__mask = {}
246 self.__mask[i] = v
247
248 def __underlying_index(self, i: int) -> int:
249 n = len(self)
250 if i < -n or i >= n:
251 raise IndexError(f"Index {i} out of range [0, {n})")
252 if i < 0:
253 i += n
254 assert 0 <= i < n
255
256 if self.__popped_indices is not None:
257 # given an index i in the popped representation of the list, compute
258 # its corresponding index in the underlying list. given
259 # l = [1, 4, 2, 10, 188]
260 # l.pop(3)
261 # l.pop(1)
262 # assert l == [1, 2, 188]
263 #
264 # we want l[i] == self.__values[f(i)], where f is this function.
265 assert len(self.__popped_indices) <= len(self.__values)
266
267 for idx in self.__popped_indices:
268 if idx > i:
269 break
270 i += 1
271 return i
272
273
274def clamp(lower: float, value: float, upper: float) -> float:
275 """Given a value and lower/upper bounds, 'clamp' the value so that
276 it satisfies lower <= value <= upper."""
277 return max(lower, min(value, upper))
278
279
280def swap(ls: LazySequenceCopy, i: int, j: int) -> None:
281 """Swap the elements ls[i], ls[j]."""
282 if i == j:
283 return
284 ls[i], ls[j] = ls[j], ls[i]
285
286
287def stack_depth_of_caller() -> int:
288 """Get stack size for caller's frame.
289
290 From https://stackoverflow.com/a/47956089/9297601 , this is a simple
291 but much faster alternative to `len(inspect.stack(0))`. We use it
292 with get/set recursionlimit to make stack overflows non-flaky; see
293 https://github.com/HypothesisWorks/hypothesis/issues/2494 for details.
294 """
295 frame = sys._getframe(2)
296 size = 1
297 while frame:
298 frame = frame.f_back # type: ignore[assignment]
299 size += 1
300 return size
301
302
303class ensure_free_stackframes:
304 """Context manager that ensures there are at least N free stackframes (for
305 a reasonable value of N).
306 """
307
308 def __enter__(self):
309 cur_depth = stack_depth_of_caller()
310 self.old_maxdepth = sys.getrecursionlimit()
311 # The default CPython recursionlimit is 1000, but pytest seems to bump
312 # it to 3000 during test execution. Let's make it something reasonable:
313 self.new_maxdepth = cur_depth + 2000
314 # Because we add to the recursion limit, to be good citizens we also
315 # add a check for unbounded recursion. The default limit is typically
316 # 1000/3000, so this can only ever trigger if something really strange
317 # is happening and it's hard to imagine an
318 # intentionally-deeply-recursive use of this code.
319 assert cur_depth <= 1000, (
320 "Hypothesis would usually add %d to the stack depth of %d here, "
321 "but we are already much deeper than expected. Aborting now, to "
322 "avoid extending the stack limit in an infinite loop..."
323 % (self.new_maxdepth - self.old_maxdepth, self.old_maxdepth)
324 )
325 sys.setrecursionlimit(self.new_maxdepth)
326
327 def __exit__(self, *args, **kwargs):
328 if self.new_maxdepth == sys.getrecursionlimit():
329 sys.setrecursionlimit(self.old_maxdepth)
330 else: # pragma: no cover
331 warnings.warn(
332 "The recursion limit will not be reset, since it was changed "
333 "from another thread or during execution of a test.",
334 HypothesisWarning,
335 stacklevel=2,
336 )
337
338
339def find_integer(f: Callable[[int], bool]) -> int:
340 """Finds a (hopefully large) integer such that f(n) is True and f(n + 1) is
341 False.
342
343 f(0) is assumed to be True and will not be checked.
344 """
345 # We first do a linear scan over the small numbers and only start to do
346 # anything intelligent if f(4) is true. This is because it's very hard to
347 # win big when the result is small. If the result is 0 and we try 2 first
348 # then we've done twice as much work as we needed to!
349 for i in range(1, 5):
350 if not f(i):
351 return i - 1
352
353 # We now know that f(4) is true. We want to find some number for which
354 # f(n) is *not* true.
355 # lo is the largest number for which we know that f(lo) is true.
356 lo = 4
357
358 # Exponential probe upwards until we find some value hi such that f(hi)
359 # is not true. Subsequently we maintain the invariant that hi is the
360 # smallest number for which we know that f(hi) is not true.
361 hi = 5
362 while f(hi):
363 lo = hi
364 hi *= 2
365
366 # Now binary search until lo + 1 = hi. At that point we have f(lo) and not
367 # f(lo + 1), as desired..
368 while lo + 1 < hi:
369 mid = (lo + hi) // 2
370 if f(mid):
371 lo = mid
372 else:
373 hi = mid
374 return lo
375
376
377class NotFound(Exception):
378 pass
379
380
381class SelfOrganisingList(Generic[T]):
382 """A self-organising list with the move-to-front heuristic.
383
384 A self-organising list is a collection which we want to retrieve items
385 that satisfy some predicate from. There is no faster way to do this than
386 a linear scan (as the predicates may be arbitrary), but the performance
387 of a linear scan can vary dramatically - if we happen to find a good item
388 on the first try it's O(1) after all. The idea of a self-organising list is
389 to reorder the list to try to get lucky this way as often as possible.
390
391 There are various heuristics we could use for this, and it's not clear
392 which are best. We use the simplest, which is that every time we find
393 an item we move it to the "front" (actually the back in our implementation
394 because we iterate in reverse) of the list.
395
396 """
397
398 def __init__(self, values: Iterable[T] = ()) -> None:
399 self.__values = list(values)
400
401 def __repr__(self) -> str:
402 return f"SelfOrganisingList({self.__values!r})"
403
404 def add(self, value: T) -> None:
405 """Add a value to this list."""
406 self.__values.append(value)
407
408 def find(self, condition: Callable[[T], bool]) -> T:
409 """Returns some value in this list such that ``condition(value)`` is
410 True. If no such value exists raises ``NotFound``."""
411 for i in range(len(self.__values) - 1, -1, -1):
412 value = self.__values[i]
413 if condition(value):
414 del self.__values[i]
415 self.__values.append(value)
416 return value
417 raise NotFound("No values satisfying condition")
418
419
420_gc_initialized = False
421_gc_start = 0
422_gc_cumulative_time = 0
423
424
425def gc_cumulative_time() -> float:
426 global _gc_initialized
427 if not _gc_initialized:
428 if hasattr(gc, "callbacks"):
429 # CPython
430 def gc_callback(phase, info):
431 global _gc_start, _gc_cumulative_time
432 try:
433 now = time.perf_counter()
434 if phase == "start":
435 _gc_start = now
436 elif phase == "stop" and _gc_start > 0:
437 _gc_cumulative_time += now - _gc_start # pragma: no cover # ??
438 except RecursionError: # pragma: no cover
439 # Avoid flakiness via UnraisableException, which is caught and
440 # warned by pytest. The actual callback (this function) is
441 # validated to never trigger a RecursionError itself when
442 # when called by gc.collect.
443 # Anyway, we should hit the same error on "start"
444 # and "stop", but to ensure we don't get out of sync we just
445 # signal that there is no matching start.
446 _gc_start = 0
447 return
448
449 gc.callbacks.insert(0, gc_callback)
450 elif hasattr(gc, "hooks"): # pragma: no cover # pypy only
451 # PyPy
452 def hook(stats):
453 global _gc_cumulative_time
454 try:
455 _gc_cumulative_time += stats.duration
456 except RecursionError:
457 pass
458
459 if gc.hooks.on_gc_minor is None:
460 gc.hooks.on_gc_minor = hook
461 if gc.hooks.on_gc_collect_step is None:
462 gc.hooks.on_gc_collect_step = hook
463
464 _gc_initialized = True
465
466 return _gc_cumulative_time