1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11"""A module for miscellaneous useful bits and bobs that don't
12obviously belong anywhere else. If you spot a better home for
13anything that lives here, please move it."""
14
15import array
16import gc
17import sys
18import time
19import warnings
20from array import ArrayType
21from collections.abc import Iterable, Iterator, Sequence
22from threading import Lock
23from typing import (
24 Any,
25 Callable,
26 ClassVar,
27 Generic,
28 Literal,
29 Optional,
30 TypeVar,
31 Union,
32 overload,
33)
34
35from sortedcontainers import SortedList
36
37from hypothesis.errors import HypothesisWarning
38
39T = TypeVar("T")
40
41
42def replace_all(
43 ls: Sequence[T],
44 replacements: Iterable[tuple[int, int, Sequence[T]]],
45) -> list[T]:
46 """Substitute multiple replacement values into a list.
47
48 Replacements is a list of (start, end, value) triples.
49 """
50
51 result: list[T] = []
52 prev = 0
53 offset = 0
54 for u, v, r in replacements:
55 result.extend(ls[prev:u])
56 result.extend(r)
57 prev = v
58 offset += len(r) - (v - u)
59 result.extend(ls[prev:])
60 assert len(result) == len(ls) + offset
61 return result
62
63
64class IntList(Sequence[int]):
65 """Class for storing a list of non-negative integers compactly.
66
67 We store them as the smallest size integer array we can get
68 away with. When we try to add an integer that is too large,
69 we upgrade the array to the smallest word size needed to store
70 the new value."""
71
72 ARRAY_CODES: ClassVar[list[str]] = ["B", "H", "I", "L", "Q", "O"]
73 NEXT_ARRAY_CODE: ClassVar[dict[str, str]] = dict(zip(ARRAY_CODES, ARRAY_CODES[1:]))
74
75 __slots__ = ("__underlying",)
76
77 def __init__(self, values: Sequence[int] = ()):
78 for code in self.ARRAY_CODES:
79 try:
80 underlying = self._array_or_list(code, values)
81 break
82 except OverflowError:
83 pass
84 else: # pragma: no cover
85 raise AssertionError(f"Could not create storage for {values!r}")
86 if isinstance(underlying, list):
87 for v in underlying:
88 if not isinstance(v, int) or v < 0:
89 raise ValueError(f"Could not create IntList for {values!r}")
90 self.__underlying: Union[list[int], "ArrayType[int]"] = underlying
91
92 @classmethod
93 def of_length(cls, n: int) -> "IntList":
94 return cls(array.array("B", [0]) * n)
95
96 @staticmethod
97 def _array_or_list(
98 code: str, contents: Iterable[int]
99 ) -> Union[list[int], "ArrayType[int]"]:
100 if code == "O":
101 return list(contents)
102 return array.array(code, contents)
103
104 def count(self, value: int) -> int:
105 return self.__underlying.count(value)
106
107 def __repr__(self) -> str:
108 return f"IntList({list(self.__underlying)!r})"
109
110 def __len__(self) -> int:
111 return len(self.__underlying)
112
113 @overload
114 def __getitem__(self, i: int) -> int: ... # pragma: no cover
115
116 @overload
117 def __getitem__(
118 self, i: slice
119 ) -> Union[list[int], "ArrayType[int]"]: ... # pragma: no cover
120
121 def __getitem__(
122 self, i: Union[int, slice]
123 ) -> Union[int, list[int], "ArrayType[int]"]:
124 return self.__underlying[i]
125
126 def __delitem__(self, i: Union[int, slice]) -> None:
127 del self.__underlying[i]
128
129 def insert(self, i: int, v: int) -> None:
130 self.__underlying.insert(i, v)
131
132 def __iter__(self) -> Iterator[int]:
133 return iter(self.__underlying)
134
135 def __eq__(self, other: object) -> bool:
136 if self is other:
137 return True
138 if not isinstance(other, IntList):
139 return NotImplemented
140 return self.__underlying == other.__underlying
141
142 def __ne__(self, other: object) -> bool:
143 if self is other:
144 return False
145 if not isinstance(other, IntList):
146 return NotImplemented
147 return self.__underlying != other.__underlying
148
149 def append(self, n: int) -> None:
150 # try the fast path of appending n first. If this overflows, use the
151 # __setitem__ path, which will upgrade the underlying array.
152 try:
153 self.__underlying.append(n)
154 except OverflowError:
155 i = len(self.__underlying)
156 self.__underlying.append(0)
157 self[i] = n
158
159 def __setitem__(self, i: int, n: int) -> None:
160 while True:
161 try:
162 self.__underlying[i] = n
163 return
164 except OverflowError:
165 assert n > 0
166 self.__upgrade()
167
168 def extend(self, ls: Iterable[int]) -> None:
169 for n in ls:
170 self.append(n)
171
172 def __upgrade(self) -> None:
173 assert isinstance(self.__underlying, array.array)
174 code = self.NEXT_ARRAY_CODE[self.__underlying.typecode]
175 self.__underlying = self._array_or_list(code, self.__underlying)
176
177
178def binary_search(lo: int, hi: int, f: Callable[[int], bool]) -> int:
179 """Binary searches in [lo , hi) to find
180 n such that f(n) == f(lo) but f(n + 1) != f(lo).
181 It is implicitly assumed and will not be checked
182 that f(hi) != f(lo).
183 """
184
185 reference = f(lo)
186
187 while lo + 1 < hi:
188 mid = (lo + hi) // 2
189 if f(mid) == reference:
190 lo = mid
191 else:
192 hi = mid
193 return lo
194
195
196class LazySequenceCopy(Generic[T]):
197 """A "copy" of a sequence that works by inserting a mask in front
198 of the underlying sequence, so that you can mutate it without changing
199 the underlying sequence. Effectively behaves as if you could do list(x)
200 in O(1) time. The full list API is not supported yet but there's no reason
201 in principle it couldn't be."""
202
203 def __init__(self, values: Sequence[T]):
204 self.__values = values
205 self.__len = len(values)
206 self.__mask: Optional[dict[int, T]] = None
207 self.__popped_indices: Optional[SortedList[int]] = None
208
209 def __len__(self) -> int:
210 if self.__popped_indices is None:
211 return self.__len
212 return self.__len - len(self.__popped_indices)
213
214 def pop(self, i: int = -1) -> T:
215 if len(self) == 0:
216 raise IndexError("Cannot pop from empty list")
217 i = self.__underlying_index(i)
218
219 v = None
220 if self.__mask is not None:
221 v = self.__mask.pop(i, None)
222 if v is None:
223 v = self.__values[i]
224
225 if self.__popped_indices is None:
226 self.__popped_indices = SortedList()
227 self.__popped_indices.add(i)
228 return v
229
230 def swap(self, i: int, j: int) -> None:
231 """Swap the elements ls[i], ls[j]."""
232 if i == j:
233 return
234 self[i], self[j] = self[j], self[i]
235
236 def __getitem__(self, i: int) -> T:
237 i = self.__underlying_index(i)
238
239 default = self.__values[i]
240 if self.__mask is None:
241 return default
242 else:
243 return self.__mask.get(i, default)
244
245 def __setitem__(self, i: int, v: T) -> None:
246 i = self.__underlying_index(i)
247 if self.__mask is None:
248 self.__mask = {}
249 self.__mask[i] = v
250
251 def __underlying_index(self, i: int) -> int:
252 n = len(self)
253 if i < -n or i >= n:
254 raise IndexError(f"Index {i} out of range [0, {n})")
255 if i < 0:
256 i += n
257 assert 0 <= i < n
258
259 if self.__popped_indices is not None:
260 # given an index i in the popped representation of the list, compute
261 # its corresponding index in the underlying list. given
262 # l = [1, 4, 2, 10, 188]
263 # l.pop(3)
264 # l.pop(1)
265 # assert l == [1, 2, 188]
266 #
267 # we want l[i] == self.__values[f(i)], where f is this function.
268 assert len(self.__popped_indices) <= len(self.__values)
269
270 for idx in self.__popped_indices:
271 if idx > i:
272 break
273 i += 1
274 return i
275
276 # even though we have len + getitem, mypyc requires iter.
277 def __iter__(self) -> Iterable[T]:
278 for i in range(len(self)):
279 yield self[i]
280
281
282def stack_depth_of_caller() -> int:
283 """Get stack size for caller's frame.
284
285 From https://stackoverflow.com/a/47956089/9297601 , this is a simple
286 but much faster alternative to `len(inspect.stack(0))`. We use it
287 with get/set recursionlimit to make stack overflows non-flaky; see
288 https://github.com/HypothesisWorks/hypothesis/issues/2494 for details.
289 """
290 frame = sys._getframe(2)
291 size = 1
292 while frame:
293 frame = frame.f_back # type: ignore[assignment]
294 size += 1
295 return size
296
297
298class StackframeLimiter:
299 # StackframeLimiter is used to make the recursion limit warning issued via
300 # ensure_free_stackframes thread-safe. We track the known values we have
301 # passed to sys.setrecursionlimit in _known_limits, and only issue a warning
302 # if sys.getrecursionlimit is not in _known_limits.
303 #
304 # This will always be an under-approximation of when we would ideally issue
305 # this warning, since a non-hypothesis caller could coincidentaly set the
306 # recursion limit to one of our known limits. Currently, StackframeLimiter
307 # resets _known_limits whenever all of the ensure_free_stackframes contexts
308 # have exited. We could increase the power of the warning by tracking a
309 # refcount for each limit, and removing it as soon as the refcount hits zero.
310 # I didn't think this extra complexity is worth the minor power increase for
311 # what is already only a "nice to have" warning.
312
313 def __init__(self):
314 self._active_contexts = 0
315 self._known_limits: set[int] = set()
316 self._original_limit: Optional[int] = None
317
318 def _setrecursionlimit(self, new_limit: int, *, check: bool = True) -> None:
319 if check and sys.getrecursionlimit() not in self._known_limits:
320 warnings.warn(
321 "The recursion limit will not be reset, since it was changed "
322 "during test execution.",
323 HypothesisWarning,
324 stacklevel=4,
325 )
326 return
327
328 self._known_limits.add(new_limit)
329 sys.setrecursionlimit(new_limit)
330
331 def enter_context(self, new_limit: int, *, current_limit: int) -> None:
332 if self._active_contexts == 0:
333 # this is the first context on the stack. Record the true original
334 # limit, to restore later.
335 assert self._original_limit is None
336 self._original_limit = current_limit
337 self._known_limits.add(self._original_limit)
338
339 self._active_contexts += 1
340 self._setrecursionlimit(new_limit)
341
342 def exit_context(self, new_limit: int, *, check: bool = True) -> None:
343 assert self._active_contexts > 0
344 self._active_contexts -= 1
345
346 if self._active_contexts == 0:
347 # this is the last context to exit. Restore the true original
348 # limit and clear our known limits.
349 original_limit = self._original_limit
350 assert original_limit is not None
351 try:
352 self._setrecursionlimit(original_limit, check=check)
353 finally:
354 self._original_limit = None
355 # we want to clear the known limits, but preserve the limit
356 # we just set it to as known.
357 self._known_limits = {original_limit}
358 else:
359 self._setrecursionlimit(new_limit, check=check)
360
361
362_stackframe_limiter = StackframeLimiter()
363_stackframe_limiter_lock = Lock()
364
365
366class ensure_free_stackframes:
367 """Context manager that ensures there are at least N free stackframes (for
368 a reasonable value of N).
369 """
370
371 def __enter__(self) -> None:
372 cur_depth = stack_depth_of_caller()
373 with _stackframe_limiter_lock:
374 self.old_limit = sys.getrecursionlimit()
375 # The default CPython recursionlimit is 1000, but pytest seems to bump
376 # it to 3000 during test execution. Let's make it something reasonable:
377 self.new_limit = cur_depth + 2000
378 # Because we add to the recursion limit, to be good citizens we also
379 # add a check for unbounded recursion. The default limit is typically
380 # 1000/3000, so this can only ever trigger if something really strange
381 # is happening and it's hard to imagine an
382 # intentionally-deeply-recursive use of this code.
383 assert cur_depth <= 1000, (
384 "Hypothesis would usually add %d to the stack depth of %d here, "
385 "but we are already much deeper than expected. Aborting now, to "
386 "avoid extending the stack limit in an infinite loop..."
387 % (self.new_limit - self.old_limit, self.old_limit)
388 )
389 try:
390 _stackframe_limiter.enter_context(
391 self.new_limit, current_limit=self.old_limit
392 )
393 except Exception:
394 # if the stackframe limiter raises a HypothesisWarning (under eg
395 # -Werror), __exit__ is not called, since we errored in __enter__.
396 # Preserve the state of the stackframe limiter by exiting, and
397 # avoid showing a duplicate warning with check=False.
398 _stackframe_limiter.exit_context(self.old_limit, check=False)
399 raise
400
401 def __exit__(self, *args, **kwargs):
402 with _stackframe_limiter_lock:
403 _stackframe_limiter.exit_context(self.old_limit)
404
405
406def find_integer(f: Callable[[int], bool]) -> int:
407 """Finds a (hopefully large) integer such that f(n) is True and f(n + 1) is
408 False.
409
410 f(0) is assumed to be True and will not be checked.
411 """
412 # We first do a linear scan over the small numbers and only start to do
413 # anything intelligent if f(4) is true. This is because it's very hard to
414 # win big when the result is small. If the result is 0 and we try 2 first
415 # then we've done twice as much work as we needed to!
416 for i in range(1, 5):
417 if not f(i):
418 return i - 1
419
420 # We now know that f(4) is true. We want to find some number for which
421 # f(n) is *not* true.
422 # lo is the largest number for which we know that f(lo) is true.
423 lo = 4
424
425 # Exponential probe upwards until we find some value hi such that f(hi)
426 # is not true. Subsequently we maintain the invariant that hi is the
427 # smallest number for which we know that f(hi) is not true.
428 hi = 5
429 while f(hi):
430 lo = hi
431 hi *= 2
432
433 # Now binary search until lo + 1 = hi. At that point we have f(lo) and not
434 # f(lo + 1), as desired..
435 while lo + 1 < hi:
436 mid = (lo + hi) // 2
437 if f(mid):
438 lo = mid
439 else:
440 hi = mid
441 return lo
442
443
444class NotFound(Exception):
445 pass
446
447
448class SelfOrganisingList(Generic[T]):
449 """A self-organising list with the move-to-front heuristic.
450
451 A self-organising list is a collection which we want to retrieve items
452 that satisfy some predicate from. There is no faster way to do this than
453 a linear scan (as the predicates may be arbitrary), but the performance
454 of a linear scan can vary dramatically - if we happen to find a good item
455 on the first try it's O(1) after all. The idea of a self-organising list is
456 to reorder the list to try to get lucky this way as often as possible.
457
458 There are various heuristics we could use for this, and it's not clear
459 which are best. We use the simplest, which is that every time we find
460 an item we move it to the "front" (actually the back in our implementation
461 because we iterate in reverse) of the list.
462
463 """
464
465 def __init__(self, values: Iterable[T] = ()) -> None:
466 self.__values = list(values)
467
468 def __repr__(self) -> str:
469 return f"SelfOrganisingList({self.__values!r})"
470
471 def add(self, value: T) -> None:
472 """Add a value to this list."""
473 self.__values.append(value)
474
475 def find(self, condition: Callable[[T], bool]) -> T:
476 """Returns some value in this list such that ``condition(value)`` is
477 True. If no such value exists raises ``NotFound``."""
478 for i in range(len(self.__values) - 1, -1, -1):
479 value = self.__values[i]
480 if condition(value):
481 del self.__values[i]
482 self.__values.append(value)
483 return value
484 raise NotFound("No values satisfying condition")
485
486
487_gc_initialized = False
488_gc_start: float = 0
489_gc_cumulative_time: float = 0
490
491# Since gc_callback potentially runs in test context, and perf_counter
492# might be monkeypatched, we store a reference to the real one.
493_perf_counter = time.perf_counter
494
495
496def gc_cumulative_time() -> float:
497 global _gc_initialized
498
499 # I don't believe we need a lock for the _gc_cumulative_time increment here,
500 # since afaik each gc callback is only executed once when the garbage collector
501 # runs, by the thread which initiated the gc.
502
503 if not _gc_initialized:
504 if hasattr(gc, "callbacks"):
505 # CPython
506 def gc_callback(
507 phase: Literal["start", "stop"], info: dict[str, int]
508 ) -> None:
509 global _gc_start, _gc_cumulative_time
510 try:
511 now = _perf_counter()
512 if phase == "start":
513 _gc_start = now
514 elif phase == "stop" and _gc_start > 0:
515 _gc_cumulative_time += now - _gc_start # pragma: no cover # ??
516 except RecursionError: # pragma: no cover
517 # Avoid flakiness via UnraisableException, which is caught and
518 # warned by pytest. The actual callback (this function) is
519 # validated to never trigger a RecursionError itself when
520 # when called by gc.collect.
521 # Anyway, we should hit the same error on "start"
522 # and "stop", but to ensure we don't get out of sync we just
523 # signal that there is no matching start.
524 _gc_start = 0
525 return
526
527 gc.callbacks.insert(0, gc_callback)
528 elif hasattr(gc, "hooks"): # pragma: no cover # pypy only
529 # PyPy
530 def hook(stats: Any) -> None:
531 global _gc_cumulative_time
532 try:
533 _gc_cumulative_time += stats.duration
534 except RecursionError:
535 pass
536
537 if gc.hooks.on_gc_minor is None:
538 gc.hooks.on_gc_minor = hook
539 if gc.hooks.on_gc_collect_step is None:
540 gc.hooks.on_gc_collect_step = hook
541
542 _gc_initialized = True
543
544 return _gc_cumulative_time
545
546
547def startswith(l1: Sequence[T], l2: Sequence[T]) -> bool:
548 if len(l1) < len(l2):
549 return False
550 return all(v1 == v2 for v1, v2 in zip(l1[: len(l2)], l2))
551
552
553def endswith(l1: Sequence[T], l2: Sequence[T]) -> bool:
554 if len(l1) < len(l2):
555 return False
556 return all(v1 == v2 for v1, v2 in zip(l1[-len(l2) :], l2))
557
558
559def bits_to_bytes(n: int) -> int:
560 """The number of bytes required to represent an n-bit number.
561 Equivalent to (n + 7) // 8, but slightly faster. This really is
562 called enough times that that matters."""
563 return (n + 7) >> 3