Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/junkdrawer.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

275 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""A module for miscellaneous useful bits and bobs that don't 

12obviously belong anywhere else. If you spot a better home for 

13anything that lives here, please move it.""" 

14 

15import array 

16import gc 

17import itertools 

18import sys 

19import time 

20import warnings 

21from array import ArrayType 

22from collections.abc import Callable, Iterable, Iterator, Sequence 

23from threading import Lock 

24from typing import ( 

25 Any, 

26 ClassVar, 

27 Generic, 

28 Literal, 

29 TypeVar, 

30 Union, 

31 overload, 

32) 

33 

34from sortedcontainers import SortedList 

35 

36from hypothesis.errors import HypothesisWarning 

37 

38T = TypeVar("T") 

39 

40 

41def replace_all( 

42 ls: Sequence[T], 

43 replacements: Iterable[tuple[int, int, Sequence[T]]], 

44) -> list[T]: 

45 """Substitute multiple replacement values into a list. 

46 

47 Replacements is a list of (start, end, value) triples. 

48 """ 

49 

50 result: list[T] = [] 

51 prev = 0 

52 offset = 0 

53 for u, v, r in replacements: 

54 result.extend(ls[prev:u]) 

55 result.extend(r) 

56 prev = v 

57 offset += len(r) - (v - u) 

58 result.extend(ls[prev:]) 

59 assert len(result) == len(ls) + offset 

60 return result 

61 

62 

63class IntList(Sequence[int]): 

64 """Class for storing a list of non-negative integers compactly. 

65 

66 We store them as the smallest size integer array we can get 

67 away with. When we try to add an integer that is too large, 

68 we upgrade the array to the smallest word size needed to store 

69 the new value.""" 

70 

71 ARRAY_CODES: ClassVar[list[str]] = ["B", "H", "I", "L", "Q", "O"] 

72 NEXT_ARRAY_CODE: ClassVar[dict[str, str]] = dict(itertools.pairwise(ARRAY_CODES)) 

73 

74 __slots__ = ("__underlying",) 

75 

76 def __init__(self, values: Sequence[int] = ()): 

77 for code in self.ARRAY_CODES: 

78 try: 

79 underlying = self._array_or_list(code, values) 

80 break 

81 except OverflowError: 

82 pass 

83 else: # pragma: no cover 

84 raise AssertionError(f"Could not create storage for {values!r}") 

85 if isinstance(underlying, list): 

86 for v in underlying: 

87 if not isinstance(v, int) or v < 0: 

88 raise ValueError(f"Could not create IntList for {values!r}") 

89 self.__underlying: list[int] | ArrayType[int] = underlying 

90 

91 @classmethod 

92 def of_length(cls, n: int) -> "IntList": 

93 return cls(array.array("B", [0]) * n) 

94 

95 @staticmethod 

96 def _array_or_list( 

97 code: str, contents: Iterable[int] 

98 ) -> Union[list[int], "ArrayType[int]"]: 

99 if code == "O": 

100 return list(contents) 

101 return array.array(code, contents) 

102 

103 def count(self, value: int) -> int: 

104 return self.__underlying.count(value) 

105 

106 def __repr__(self) -> str: 

107 return f"IntList({list(self.__underlying)!r})" 

108 

109 def __len__(self) -> int: 

110 return len(self.__underlying) 

111 

112 @overload 

113 def __getitem__(self, i: int) -> int: ... # pragma: no cover 

114 

115 @overload 

116 def __getitem__( 

117 self, i: slice 

118 ) -> "list[int] | ArrayType[int]": ... # pragma: no cover 

119 

120 def __getitem__(self, i: int | slice) -> "int | list[int] | ArrayType[int]": 

121 return self.__underlying[i] 

122 

123 def __delitem__(self, i: int | slice) -> None: 

124 del self.__underlying[i] 

125 

126 def insert(self, i: int, v: int) -> None: 

127 self.__underlying.insert(i, v) 

128 

129 def __iter__(self) -> Iterator[int]: 

130 return iter(self.__underlying) 

131 

132 def __eq__(self, other: object) -> bool: 

133 if self is other: 

134 return True 

135 if not isinstance(other, IntList): 

136 return NotImplemented 

137 return self.__underlying == other.__underlying 

138 

139 def __ne__(self, other: object) -> bool: 

140 if self is other: 

141 return False 

142 if not isinstance(other, IntList): 

143 return NotImplemented 

144 return self.__underlying != other.__underlying 

145 

146 def append(self, n: int) -> None: 

147 # try the fast path of appending n first. If this overflows, use the 

148 # __setitem__ path, which will upgrade the underlying array. 

149 try: 

150 self.__underlying.append(n) 

151 except OverflowError: 

152 i = len(self.__underlying) 

153 self.__underlying.append(0) 

154 self[i] = n 

155 

156 def __setitem__(self, i: int, n: int) -> None: 

157 while True: 

158 try: 

159 self.__underlying[i] = n 

160 return 

161 except OverflowError: 

162 assert n > 0 

163 self.__upgrade() 

164 

165 def extend(self, ls: Iterable[int]) -> None: 

166 for n in ls: 

167 self.append(n) 

168 

169 def __upgrade(self) -> None: 

170 assert isinstance(self.__underlying, array.array) 

171 code = self.NEXT_ARRAY_CODE[self.__underlying.typecode] 

172 self.__underlying = self._array_or_list(code, self.__underlying) 

173 

174 

175def binary_search(lo: int, hi: int, f: Callable[[int], bool]) -> int: 

176 """Binary searches in [lo , hi) to find 

177 n such that f(n) == f(lo) but f(n + 1) != f(lo). 

178 It is implicitly assumed and will not be checked 

179 that f(hi) != f(lo). 

180 """ 

181 

182 reference = f(lo) 

183 

184 while lo + 1 < hi: 

185 mid = (lo + hi) // 2 

186 if f(mid) == reference: 

187 lo = mid 

188 else: 

189 hi = mid 

190 return lo 

191 

192 

193class LazySequenceCopy(Generic[T]): 

194 """A "copy" of a sequence that works by inserting a mask in front 

195 of the underlying sequence, so that you can mutate it without changing 

196 the underlying sequence. Effectively behaves as if you could do list(x) 

197 in O(1) time. The full list API is not supported yet but there's no reason 

198 in principle it couldn't be.""" 

199 

200 def __init__(self, values: Sequence[T]): 

201 self.__values = values 

202 self.__len = len(values) 

203 self.__mask: dict[int, T] | None = None 

204 self.__popped_indices: SortedList[int] | None = None 

205 

206 def __len__(self) -> int: 

207 if self.__popped_indices is None: 

208 return self.__len 

209 return self.__len - len(self.__popped_indices) 

210 

211 def pop(self, i: int = -1) -> T: 

212 if len(self) == 0: 

213 raise IndexError("Cannot pop from empty list") 

214 i = self.__underlying_index(i) 

215 

216 v = None 

217 if self.__mask is not None: 

218 v = self.__mask.pop(i, None) 

219 if v is None: 

220 v = self.__values[i] 

221 

222 if self.__popped_indices is None: 

223 self.__popped_indices = SortedList() 

224 self.__popped_indices.add(i) 

225 return v 

226 

227 def swap(self, i: int, j: int) -> None: 

228 """Swap the elements ls[i], ls[j].""" 

229 if i == j: 

230 return 

231 self[i], self[j] = self[j], self[i] 

232 

233 def __getitem__(self, i: int) -> T: 

234 i = self.__underlying_index(i) 

235 

236 default = self.__values[i] 

237 if self.__mask is None: 

238 return default 

239 else: 

240 return self.__mask.get(i, default) 

241 

242 def __setitem__(self, i: int, v: T) -> None: 

243 i = self.__underlying_index(i) 

244 if self.__mask is None: 

245 self.__mask = {} 

246 self.__mask[i] = v 

247 

248 def __underlying_index(self, i: int) -> int: 

249 n = len(self) 

250 if i < -n or i >= n: 

251 raise IndexError(f"Index {i} out of range [0, {n})") 

252 if i < 0: 

253 i += n 

254 assert 0 <= i < n 

255 

256 if self.__popped_indices is not None: 

257 # given an index i in the popped representation of the list, compute 

258 # its corresponding index in the underlying list. given 

259 # l = [1, 4, 2, 10, 188] 

260 # l.pop(3) 

261 # l.pop(1) 

262 # assert l == [1, 2, 188] 

263 # 

264 # we want l[i] == self.__values[f(i)], where f is this function. 

265 assert len(self.__popped_indices) <= len(self.__values) 

266 

267 for idx in self.__popped_indices: 

268 if idx > i: 

269 break 

270 i += 1 

271 return i 

272 

273 # even though we have len + getitem, mypyc requires iter. 

274 def __iter__(self) -> Iterable[T]: 

275 for i in range(len(self)): 

276 yield self[i] 

277 

278 

279def stack_depth_of_caller() -> int: 

280 """Get stack size for caller's frame. 

281 

282 From https://stackoverflow.com/a/47956089/9297601 , this is a simple 

283 but much faster alternative to `len(inspect.stack(0))`. We use it 

284 with get/set recursionlimit to make stack overflows non-flaky; see 

285 https://github.com/HypothesisWorks/hypothesis/issues/2494 for details. 

286 """ 

287 frame = sys._getframe(2) 

288 size = 1 

289 while frame: 

290 frame = frame.f_back # type: ignore[assignment] 

291 size += 1 

292 return size 

293 

294 

295class StackframeLimiter: 

296 # StackframeLimiter is used to make the recursion limit warning issued via 

297 # ensure_free_stackframes thread-safe. We track the known values we have 

298 # passed to sys.setrecursionlimit in _known_limits, and only issue a warning 

299 # if sys.getrecursionlimit is not in _known_limits. 

300 # 

301 # This will always be an under-approximation of when we would ideally issue 

302 # this warning, since a non-hypothesis caller could coincidentaly set the 

303 # recursion limit to one of our known limits. Currently, StackframeLimiter 

304 # resets _known_limits whenever all of the ensure_free_stackframes contexts 

305 # have exited. We could increase the power of the warning by tracking a 

306 # refcount for each limit, and removing it as soon as the refcount hits zero. 

307 # I didn't think this extra complexity is worth the minor power increase for 

308 # what is already only a "nice to have" warning. 

309 

310 def __init__(self): 

311 self._active_contexts = 0 

312 self._known_limits: set[int] = set() 

313 self._original_limit: int | None = None 

314 

315 def _setrecursionlimit(self, new_limit: int, *, check: bool = True) -> None: 

316 if ( 

317 check 

318 and (current_limit := sys.getrecursionlimit()) not in self._known_limits 

319 ): 

320 warnings.warn( 

321 "The recursion limit will not be reset, since it was changed " 

322 f"during test execution (from {self._original_limit} to {current_limit}).", 

323 HypothesisWarning, 

324 stacklevel=4, 

325 ) 

326 return 

327 

328 self._known_limits.add(new_limit) 

329 sys.setrecursionlimit(new_limit) 

330 

331 def enter_context(self, new_limit: int, *, current_limit: int) -> None: 

332 if self._active_contexts == 0: 

333 # this is the first context on the stack. Record the true original 

334 # limit, to restore later. 

335 assert self._original_limit is None 

336 self._original_limit = current_limit 

337 self._known_limits.add(self._original_limit) 

338 

339 self._active_contexts += 1 

340 self._setrecursionlimit(new_limit) 

341 

342 def exit_context(self, new_limit: int, *, check: bool = True) -> None: 

343 assert self._active_contexts > 0 

344 self._active_contexts -= 1 

345 

346 if self._active_contexts == 0: 

347 # this is the last context to exit. Restore the true original 

348 # limit and clear our known limits. 

349 original_limit = self._original_limit 

350 assert original_limit is not None 

351 try: 

352 self._setrecursionlimit(original_limit, check=check) 

353 finally: 

354 self._original_limit = None 

355 # we want to clear the known limits, but preserve the limit 

356 # we just set it to as known. 

357 self._known_limits = {original_limit} 

358 else: 

359 self._setrecursionlimit(new_limit, check=check) 

360 

361 

362_stackframe_limiter = StackframeLimiter() 

363_stackframe_limiter_lock = Lock() 

364 

365 

366class ensure_free_stackframes: 

367 """Context manager that ensures there are at least N free stackframes (for 

368 a reasonable value of N). 

369 """ 

370 

371 def __enter__(self) -> None: 

372 cur_depth = stack_depth_of_caller() 

373 with _stackframe_limiter_lock: 

374 self.old_limit = sys.getrecursionlimit() 

375 # The default CPython recursionlimit is 1000, but pytest seems to bump 

376 # it to 3000 during test execution. Let's make it something reasonable: 

377 self.new_limit = cur_depth + 2000 

378 # Because we add to the recursion limit, to be good citizens we also 

379 # add a check for unbounded recursion. The default limit is typically 

380 # 1000/3000, so this can only ever trigger if something really strange 

381 # is happening and it's hard to imagine an 

382 # intentionally-deeply-recursive use of this code. 

383 assert cur_depth <= 1000, ( 

384 "Hypothesis would usually add %d to the stack depth of %d here, " 

385 "but we are already much deeper than expected. Aborting now, to " 

386 "avoid extending the stack limit in an infinite loop..." 

387 % (self.new_limit - self.old_limit, self.old_limit) 

388 ) 

389 try: 

390 _stackframe_limiter.enter_context( 

391 self.new_limit, current_limit=self.old_limit 

392 ) 

393 except Exception: 

394 # if the stackframe limiter raises a HypothesisWarning (under eg 

395 # -Werror), __exit__ is not called, since we errored in __enter__. 

396 # Preserve the state of the stackframe limiter by exiting, and 

397 # avoid showing a duplicate warning with check=False. 

398 _stackframe_limiter.exit_context(self.old_limit, check=False) 

399 raise 

400 

401 def __exit__(self, *args, **kwargs): 

402 with _stackframe_limiter_lock: 

403 _stackframe_limiter.exit_context(self.old_limit) 

404 

405 

406def find_integer(f: Callable[[int], bool]) -> int: 

407 """Finds a (hopefully large) integer such that f(n) is True and f(n + 1) is 

408 False. 

409 

410 f(0) is assumed to be True and will not be checked. 

411 """ 

412 # We first do a linear scan over the small numbers and only start to do 

413 # anything intelligent if f(4) is true. This is because it's very hard to 

414 # win big when the result is small. If the result is 0 and we try 2 first 

415 # then we've done twice as much work as we needed to! 

416 for i in range(1, 5): 

417 if not f(i): 

418 return i - 1 

419 

420 # We now know that f(4) is true. We want to find some number for which 

421 # f(n) is *not* true. 

422 # lo is the largest number for which we know that f(lo) is true. 

423 lo = 4 

424 

425 # Exponential probe upwards until we find some value hi such that f(hi) 

426 # is not true. Subsequently we maintain the invariant that hi is the 

427 # smallest number for which we know that f(hi) is not true. 

428 hi = 5 

429 while f(hi): 

430 lo = hi 

431 hi *= 2 

432 

433 # Now binary search until lo + 1 = hi. At that point we have f(lo) and not 

434 # f(lo + 1), as desired.. 

435 while lo + 1 < hi: 

436 mid = (lo + hi) // 2 

437 if f(mid): 

438 lo = mid 

439 else: 

440 hi = mid 

441 return lo 

442 

443 

444class NotFound(Exception): 

445 pass 

446 

447 

448class SelfOrganisingList(Generic[T]): 

449 """A self-organising list with the move-to-front heuristic. 

450 

451 A self-organising list is a collection which we want to retrieve items 

452 that satisfy some predicate from. There is no faster way to do this than 

453 a linear scan (as the predicates may be arbitrary), but the performance 

454 of a linear scan can vary dramatically - if we happen to find a good item 

455 on the first try it's O(1) after all. The idea of a self-organising list is 

456 to reorder the list to try to get lucky this way as often as possible. 

457 

458 There are various heuristics we could use for this, and it's not clear 

459 which are best. We use the simplest, which is that every time we find 

460 an item we move it to the "front" (actually the back in our implementation 

461 because we iterate in reverse) of the list. 

462 

463 """ 

464 

465 def __init__(self, values: Iterable[T] = ()) -> None: 

466 self.__values = list(values) 

467 

468 def __repr__(self) -> str: 

469 return f"SelfOrganisingList({self.__values!r})" 

470 

471 def add(self, value: T) -> None: 

472 """Add a value to this list.""" 

473 self.__values.append(value) 

474 

475 def find(self, condition: Callable[[T], bool]) -> T: 

476 """Returns some value in this list such that ``condition(value)`` is 

477 True. If no such value exists raises ``NotFound``.""" 

478 for i in range(len(self.__values) - 1, -1, -1): 

479 value = self.__values[i] 

480 if condition(value): 

481 del self.__values[i] 

482 self.__values.append(value) 

483 return value 

484 raise NotFound("No values satisfying condition") 

485 

486 

487_gc_initialized = False 

488_gc_start: float = 0 

489_gc_cumulative_time: float = 0 

490 

491# Since gc_callback potentially runs in test context, and perf_counter 

492# might be monkeypatched, we store a reference to the real one. 

493_perf_counter = time.perf_counter 

494 

495 

496def gc_cumulative_time() -> float: 

497 global _gc_initialized 

498 

499 # I don't believe we need a lock for the _gc_cumulative_time increment here, 

500 # since afaik each gc callback is only executed once when the garbage collector 

501 # runs, by the thread which initiated the gc. 

502 

503 if not _gc_initialized: 

504 if hasattr(gc, "callbacks"): 

505 # CPython 

506 def gc_callback( 

507 phase: Literal["start", "stop"], info: dict[str, int] 

508 ) -> None: 

509 global _gc_start, _gc_cumulative_time 

510 try: 

511 now = _perf_counter() 

512 if phase == "start": 

513 _gc_start = now 

514 elif phase == "stop" and _gc_start > 0: 

515 _gc_cumulative_time += now - _gc_start # pragma: no cover # ?? 

516 except RecursionError: # pragma: no cover 

517 # Avoid flakiness via UnraisableException, which is caught and 

518 # warned by pytest. The actual callback (this function) is 

519 # validated to never trigger a RecursionError itself when 

520 # when called by gc.collect. 

521 # Anyway, we should hit the same error on "start" 

522 # and "stop", but to ensure we don't get out of sync we just 

523 # signal that there is no matching start. 

524 _gc_start = 0 

525 return 

526 

527 gc.callbacks.insert(0, gc_callback) 

528 elif hasattr(gc, "hooks"): # pragma: no cover # pypy only 

529 # PyPy 

530 def hook(stats: Any) -> None: 

531 global _gc_cumulative_time 

532 try: 

533 _gc_cumulative_time += stats.duration 

534 except RecursionError: 

535 pass 

536 

537 if gc.hooks.on_gc_minor is None: 

538 gc.hooks.on_gc_minor = hook 

539 if gc.hooks.on_gc_collect_step is None: 

540 gc.hooks.on_gc_collect_step = hook 

541 

542 _gc_initialized = True 

543 

544 return _gc_cumulative_time 

545 

546 

547def startswith(l1: Sequence[T], l2: Sequence[T]) -> bool: 

548 if len(l1) < len(l2): 

549 return False 

550 return all(v1 == v2 for v1, v2 in zip(l1[: len(l2)], l2, strict=False)) 

551 

552 

553def endswith(l1: Sequence[T], l2: Sequence[T]) -> bool: 

554 if len(l1) < len(l2): 

555 return False 

556 return all(v1 == v2 for v1, v2 in zip(l1[-len(l2) :], l2, strict=False)) 

557 

558 

559def bits_to_bytes(n: int) -> int: 

560 """The number of bytes required to represent an n-bit number. 

561 Equivalent to (n + 7) // 8, but slightly faster. This really is 

562 called enough times that that matters.""" 

563 return (n + 7) >> 3