1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11"""A module for miscellaneous useful bits and bobs that don't
12obviously belong anywhere else. If you spot a better home for
13anything that lives here, please move it."""
14
15import array
16import gc
17import itertools
18import sys
19import time
20import warnings
21from array import ArrayType
22from collections.abc import Callable, Iterable, Iterator, Sequence
23from threading import Lock
24from typing import (
25 Any,
26 ClassVar,
27 Generic,
28 Literal,
29 TypeVar,
30 Union,
31 overload,
32)
33
34from sortedcontainers import SortedList
35
36from hypothesis.errors import HypothesisWarning
37
38T = TypeVar("T")
39
40
41def replace_all(
42 ls: Sequence[T],
43 replacements: Iterable[tuple[int, int, Sequence[T]]],
44) -> list[T]:
45 """Substitute multiple replacement values into a list.
46
47 Replacements is a list of (start, end, value) triples.
48 """
49
50 result: list[T] = []
51 prev = 0
52 offset = 0
53 for u, v, r in replacements:
54 result.extend(ls[prev:u])
55 result.extend(r)
56 prev = v
57 offset += len(r) - (v - u)
58 result.extend(ls[prev:])
59 assert len(result) == len(ls) + offset
60 return result
61
62
63class IntList(Sequence[int]):
64 """Class for storing a list of non-negative integers compactly.
65
66 We store them as the smallest size integer array we can get
67 away with. When we try to add an integer that is too large,
68 we upgrade the array to the smallest word size needed to store
69 the new value."""
70
71 ARRAY_CODES: ClassVar[list[str]] = ["B", "H", "I", "L", "Q", "O"]
72 NEXT_ARRAY_CODE: ClassVar[dict[str, str]] = dict(itertools.pairwise(ARRAY_CODES))
73
74 __slots__ = ("__underlying",)
75
76 def __init__(self, values: Sequence[int] = ()):
77 for code in self.ARRAY_CODES:
78 try:
79 underlying = self._array_or_list(code, values)
80 break
81 except OverflowError:
82 pass
83 else: # pragma: no cover
84 raise AssertionError(f"Could not create storage for {values!r}")
85 if isinstance(underlying, list):
86 for v in underlying:
87 if not isinstance(v, int) or v < 0:
88 raise ValueError(f"Could not create IntList for {values!r}")
89 self.__underlying: list[int] | ArrayType[int] = underlying
90
91 @classmethod
92 def of_length(cls, n: int) -> "IntList":
93 return cls(array.array("B", [0]) * n)
94
95 @staticmethod
96 def _array_or_list(
97 code: str, contents: Iterable[int]
98 ) -> Union[list[int], "ArrayType[int]"]:
99 if code == "O":
100 return list(contents)
101 return array.array(code, contents)
102
103 def count(self, value: int) -> int:
104 return self.__underlying.count(value)
105
106 def __repr__(self) -> str:
107 return f"IntList({list(self.__underlying)!r})"
108
109 def __len__(self) -> int:
110 return len(self.__underlying)
111
112 @overload
113 def __getitem__(self, i: int) -> int: ... # pragma: no cover
114
115 @overload
116 def __getitem__(
117 self, i: slice
118 ) -> "list[int] | ArrayType[int]": ... # pragma: no cover
119
120 def __getitem__(self, i: int | slice) -> "int | list[int] | ArrayType[int]":
121 return self.__underlying[i]
122
123 def __delitem__(self, i: int | slice) -> None:
124 del self.__underlying[i]
125
126 def insert(self, i: int, v: int) -> None:
127 self.__underlying.insert(i, v)
128
129 def __iter__(self) -> Iterator[int]:
130 return iter(self.__underlying)
131
132 def __eq__(self, other: object) -> bool:
133 if self is other:
134 return True
135 if not isinstance(other, IntList):
136 return NotImplemented
137 return self.__underlying == other.__underlying
138
139 def __ne__(self, other: object) -> bool:
140 if self is other:
141 return False
142 if not isinstance(other, IntList):
143 return NotImplemented
144 return self.__underlying != other.__underlying
145
146 def append(self, n: int) -> None:
147 # try the fast path of appending n first. If this overflows, use the
148 # __setitem__ path, which will upgrade the underlying array.
149 try:
150 self.__underlying.append(n)
151 except OverflowError:
152 i = len(self.__underlying)
153 self.__underlying.append(0)
154 self[i] = n
155
156 def __setitem__(self, i: int, n: int) -> None:
157 while True:
158 try:
159 self.__underlying[i] = n
160 return
161 except OverflowError:
162 assert n > 0
163 self.__upgrade()
164
165 def extend(self, ls: Iterable[int]) -> None:
166 for n in ls:
167 self.append(n)
168
169 def __upgrade(self) -> None:
170 assert isinstance(self.__underlying, array.array)
171 code = self.NEXT_ARRAY_CODE[self.__underlying.typecode]
172 self.__underlying = self._array_or_list(code, self.__underlying)
173
174
175def binary_search(lo: int, hi: int, f: Callable[[int], bool]) -> int:
176 """Binary searches in [lo , hi) to find
177 n such that f(n) == f(lo) but f(n + 1) != f(lo).
178 It is implicitly assumed and will not be checked
179 that f(hi) != f(lo).
180 """
181
182 reference = f(lo)
183
184 while lo + 1 < hi:
185 mid = (lo + hi) // 2
186 if f(mid) == reference:
187 lo = mid
188 else:
189 hi = mid
190 return lo
191
192
193class LazySequenceCopy(Generic[T]):
194 """A "copy" of a sequence that works by inserting a mask in front
195 of the underlying sequence, so that you can mutate it without changing
196 the underlying sequence. Effectively behaves as if you could do list(x)
197 in O(1) time. The full list API is not supported yet but there's no reason
198 in principle it couldn't be."""
199
200 def __init__(self, values: Sequence[T]):
201 self.__values = values
202 self.__len = len(values)
203 self.__mask: dict[int, T] | None = None
204 self.__popped_indices: SortedList[int] | None = None
205
206 def __len__(self) -> int:
207 if self.__popped_indices is None:
208 return self.__len
209 return self.__len - len(self.__popped_indices)
210
211 def pop(self, i: int = -1) -> T:
212 if len(self) == 0:
213 raise IndexError("Cannot pop from empty list")
214 i = self.__underlying_index(i)
215
216 v = None
217 if self.__mask is not None:
218 v = self.__mask.pop(i, None)
219 if v is None:
220 v = self.__values[i]
221
222 if self.__popped_indices is None:
223 self.__popped_indices = SortedList()
224 self.__popped_indices.add(i)
225 return v
226
227 def swap(self, i: int, j: int) -> None:
228 """Swap the elements ls[i], ls[j]."""
229 if i == j:
230 return
231 self[i], self[j] = self[j], self[i]
232
233 def __getitem__(self, i: int) -> T:
234 i = self.__underlying_index(i)
235
236 default = self.__values[i]
237 if self.__mask is None:
238 return default
239 else:
240 return self.__mask.get(i, default)
241
242 def __setitem__(self, i: int, v: T) -> None:
243 i = self.__underlying_index(i)
244 if self.__mask is None:
245 self.__mask = {}
246 self.__mask[i] = v
247
248 def __underlying_index(self, i: int) -> int:
249 n = len(self)
250 if i < -n or i >= n:
251 raise IndexError(f"Index {i} out of range [0, {n})")
252 if i < 0:
253 i += n
254 assert 0 <= i < n
255
256 if self.__popped_indices is not None:
257 # given an index i in the popped representation of the list, compute
258 # its corresponding index in the underlying list. given
259 # l = [1, 4, 2, 10, 188]
260 # l.pop(3)
261 # l.pop(1)
262 # assert l == [1, 2, 188]
263 #
264 # we want l[i] == self.__values[f(i)], where f is this function.
265 assert len(self.__popped_indices) <= len(self.__values)
266
267 for idx in self.__popped_indices:
268 if idx > i:
269 break
270 i += 1
271 return i
272
273 # even though we have len + getitem, mypyc requires iter.
274 def __iter__(self) -> Iterable[T]:
275 for i in range(len(self)):
276 yield self[i]
277
278
279def stack_depth_of_caller() -> int:
280 """Get stack size for caller's frame.
281
282 From https://stackoverflow.com/a/47956089/9297601 , this is a simple
283 but much faster alternative to `len(inspect.stack(0))`. We use it
284 with get/set recursionlimit to make stack overflows non-flaky; see
285 https://github.com/HypothesisWorks/hypothesis/issues/2494 for details.
286 """
287 frame = sys._getframe(2)
288 size = 1
289 while frame:
290 frame = frame.f_back # type: ignore[assignment]
291 size += 1
292 return size
293
294
295class StackframeLimiter:
296 # StackframeLimiter is used to make the recursion limit warning issued via
297 # ensure_free_stackframes thread-safe. We track the known values we have
298 # passed to sys.setrecursionlimit in _known_limits, and only issue a warning
299 # if sys.getrecursionlimit is not in _known_limits.
300 #
301 # This will always be an under-approximation of when we would ideally issue
302 # this warning, since a non-hypothesis caller could coincidentaly set the
303 # recursion limit to one of our known limits. Currently, StackframeLimiter
304 # resets _known_limits whenever all of the ensure_free_stackframes contexts
305 # have exited. We could increase the power of the warning by tracking a
306 # refcount for each limit, and removing it as soon as the refcount hits zero.
307 # I didn't think this extra complexity is worth the minor power increase for
308 # what is already only a "nice to have" warning.
309
310 def __init__(self):
311 self._active_contexts = 0
312 self._known_limits: set[int] = set()
313 self._original_limit: int | None = None
314
315 def _setrecursionlimit(self, new_limit: int, *, check: bool = True) -> None:
316 if (
317 check
318 and (current_limit := sys.getrecursionlimit()) not in self._known_limits
319 ):
320 warnings.warn(
321 "The recursion limit will not be reset, since it was changed "
322 f"during test execution (from {self._original_limit} to {current_limit}).",
323 HypothesisWarning,
324 stacklevel=4,
325 )
326 return
327
328 self._known_limits.add(new_limit)
329 sys.setrecursionlimit(new_limit)
330
331 def enter_context(self, new_limit: int, *, current_limit: int) -> None:
332 if self._active_contexts == 0:
333 # this is the first context on the stack. Record the true original
334 # limit, to restore later.
335 assert self._original_limit is None
336 self._original_limit = current_limit
337 self._known_limits.add(self._original_limit)
338
339 self._active_contexts += 1
340 self._setrecursionlimit(new_limit)
341
342 def exit_context(self, new_limit: int, *, check: bool = True) -> None:
343 assert self._active_contexts > 0
344 self._active_contexts -= 1
345
346 if self._active_contexts == 0:
347 # this is the last context to exit. Restore the true original
348 # limit and clear our known limits.
349 original_limit = self._original_limit
350 assert original_limit is not None
351 try:
352 self._setrecursionlimit(original_limit, check=check)
353 finally:
354 self._original_limit = None
355 # we want to clear the known limits, but preserve the limit
356 # we just set it to as known.
357 self._known_limits = {original_limit}
358 else:
359 self._setrecursionlimit(new_limit, check=check)
360
361
362_stackframe_limiter = StackframeLimiter()
363_stackframe_limiter_lock = Lock()
364
365
366class ensure_free_stackframes:
367 """Context manager that ensures there are at least N free stackframes (for
368 a reasonable value of N).
369 """
370
371 def __enter__(self) -> None:
372 cur_depth = stack_depth_of_caller()
373 with _stackframe_limiter_lock:
374 self.old_limit = sys.getrecursionlimit()
375 # The default CPython recursionlimit is 1000, but pytest seems to bump
376 # it to 3000 during test execution. Let's make it something reasonable:
377 self.new_limit = cur_depth + 2000
378 # Because we add to the recursion limit, to be good citizens we also
379 # add a check for unbounded recursion. The default limit is typically
380 # 1000/3000, so this can only ever trigger if something really strange
381 # is happening and it's hard to imagine an
382 # intentionally-deeply-recursive use of this code.
383 assert cur_depth <= 1000, (
384 "Hypothesis would usually add %d to the stack depth of %d here, "
385 "but we are already much deeper than expected. Aborting now, to "
386 "avoid extending the stack limit in an infinite loop..."
387 % (self.new_limit - self.old_limit, self.old_limit)
388 )
389 try:
390 _stackframe_limiter.enter_context(
391 self.new_limit, current_limit=self.old_limit
392 )
393 except Exception:
394 # if the stackframe limiter raises a HypothesisWarning (under eg
395 # -Werror), __exit__ is not called, since we errored in __enter__.
396 # Preserve the state of the stackframe limiter by exiting, and
397 # avoid showing a duplicate warning with check=False.
398 _stackframe_limiter.exit_context(self.old_limit, check=False)
399 raise
400
401 def __exit__(self, *args, **kwargs):
402 with _stackframe_limiter_lock:
403 _stackframe_limiter.exit_context(self.old_limit)
404
405
406def find_integer(f: Callable[[int], bool]) -> int:
407 """Finds a (hopefully large) integer such that f(n) is True and f(n + 1) is
408 False.
409
410 f(0) is assumed to be True and will not be checked.
411 """
412 # We first do a linear scan over the small numbers and only start to do
413 # anything intelligent if f(4) is true. This is because it's very hard to
414 # win big when the result is small. If the result is 0 and we try 2 first
415 # then we've done twice as much work as we needed to!
416 for i in range(1, 5):
417 if not f(i):
418 return i - 1
419
420 # We now know that f(4) is true. We want to find some number for which
421 # f(n) is *not* true.
422 # lo is the largest number for which we know that f(lo) is true.
423 lo = 4
424
425 # Exponential probe upwards until we find some value hi such that f(hi)
426 # is not true. Subsequently we maintain the invariant that hi is the
427 # smallest number for which we know that f(hi) is not true.
428 hi = 5
429 while f(hi):
430 lo = hi
431 hi *= 2
432
433 # Now binary search until lo + 1 = hi. At that point we have f(lo) and not
434 # f(lo + 1), as desired..
435 while lo + 1 < hi:
436 mid = (lo + hi) // 2
437 if f(mid):
438 lo = mid
439 else:
440 hi = mid
441 return lo
442
443
444class NotFound(Exception):
445 pass
446
447
448class SelfOrganisingList(Generic[T]):
449 """A self-organising list with the move-to-front heuristic.
450
451 A self-organising list is a collection which we want to retrieve items
452 that satisfy some predicate from. There is no faster way to do this than
453 a linear scan (as the predicates may be arbitrary), but the performance
454 of a linear scan can vary dramatically - if we happen to find a good item
455 on the first try it's O(1) after all. The idea of a self-organising list is
456 to reorder the list to try to get lucky this way as often as possible.
457
458 There are various heuristics we could use for this, and it's not clear
459 which are best. We use the simplest, which is that every time we find
460 an item we move it to the "front" (actually the back in our implementation
461 because we iterate in reverse) of the list.
462
463 """
464
465 def __init__(self, values: Iterable[T] = ()) -> None:
466 self.__values = list(values)
467
468 def __repr__(self) -> str:
469 return f"SelfOrganisingList({self.__values!r})"
470
471 def add(self, value: T) -> None:
472 """Add a value to this list."""
473 self.__values.append(value)
474
475 def find(self, condition: Callable[[T], bool]) -> T:
476 """Returns some value in this list such that ``condition(value)`` is
477 True. If no such value exists raises ``NotFound``."""
478 for i in range(len(self.__values) - 1, -1, -1):
479 value = self.__values[i]
480 if condition(value):
481 del self.__values[i]
482 self.__values.append(value)
483 return value
484 raise NotFound("No values satisfying condition")
485
486
487_gc_initialized = False
488_gc_start: float = 0
489_gc_cumulative_time: float = 0
490
491# Since gc_callback potentially runs in test context, and perf_counter
492# might be monkeypatched, we store a reference to the real one.
493_perf_counter = time.perf_counter
494
495
496def gc_cumulative_time() -> float:
497 global _gc_initialized
498
499 # I don't believe we need a lock for the _gc_cumulative_time increment here,
500 # since afaik each gc callback is only executed once when the garbage collector
501 # runs, by the thread which initiated the gc.
502
503 if not _gc_initialized:
504 if hasattr(gc, "callbacks"):
505 # CPython
506 def gc_callback(
507 phase: Literal["start", "stop"], info: dict[str, int]
508 ) -> None:
509 global _gc_start, _gc_cumulative_time
510 try:
511 now = _perf_counter()
512 if phase == "start":
513 _gc_start = now
514 elif phase == "stop" and _gc_start > 0:
515 _gc_cumulative_time += now - _gc_start # pragma: no cover # ??
516 except RecursionError: # pragma: no cover
517 # Avoid flakiness via UnraisableException, which is caught and
518 # warned by pytest. The actual callback (this function) is
519 # validated to never trigger a RecursionError itself when
520 # when called by gc.collect.
521 # Anyway, we should hit the same error on "start"
522 # and "stop", but to ensure we don't get out of sync we just
523 # signal that there is no matching start.
524 _gc_start = 0
525 return
526
527 gc.callbacks.insert(0, gc_callback)
528 elif hasattr(gc, "hooks"): # pragma: no cover # pypy only
529 # PyPy
530 def hook(stats: Any) -> None:
531 global _gc_cumulative_time
532 try:
533 _gc_cumulative_time += stats.duration
534 except RecursionError:
535 pass
536
537 if gc.hooks.on_gc_minor is None:
538 gc.hooks.on_gc_minor = hook
539 if gc.hooks.on_gc_collect_step is None:
540 gc.hooks.on_gc_collect_step = hook
541
542 _gc_initialized = True
543
544 return _gc_cumulative_time
545
546
547def startswith(l1: Sequence[T], l2: Sequence[T]) -> bool:
548 if len(l1) < len(l2):
549 return False
550 return all(v1 == v2 for v1, v2 in zip(l1[: len(l2)], l2, strict=False))
551
552
553def endswith(l1: Sequence[T], l2: Sequence[T]) -> bool:
554 if len(l1) < len(l2):
555 return False
556 return all(v1 == v2 for v1, v2 in zip(l1[-len(l2) :], l2, strict=False))
557
558
559def bits_to_bytes(n: int) -> int:
560 """The number of bytes required to represent an n-bit number.
561 Equivalent to (n + 7) // 8, but slightly faster. This really is
562 called enough times that that matters."""
563 return (n + 7) >> 3