Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/junkdrawer.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

274 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""A module for miscellaneous useful bits and bobs that don't 

12obviously belong anywhere else. If you spot a better home for 

13anything that lives here, please move it.""" 

14 

15import array 

16import gc 

17import sys 

18import time 

19import warnings 

20from array import ArrayType 

21from collections.abc import Iterable, Iterator, Sequence 

22from threading import Lock 

23from typing import ( 

24 Any, 

25 Callable, 

26 ClassVar, 

27 Generic, 

28 Literal, 

29 Optional, 

30 TypeVar, 

31 Union, 

32 overload, 

33) 

34 

35from sortedcontainers import SortedList 

36 

37from hypothesis.errors import HypothesisWarning 

38 

39T = TypeVar("T") 

40 

41 

42def replace_all( 

43 ls: Sequence[T], 

44 replacements: Iterable[tuple[int, int, Sequence[T]]], 

45) -> list[T]: 

46 """Substitute multiple replacement values into a list. 

47 

48 Replacements is a list of (start, end, value) triples. 

49 """ 

50 

51 result: list[T] = [] 

52 prev = 0 

53 offset = 0 

54 for u, v, r in replacements: 

55 result.extend(ls[prev:u]) 

56 result.extend(r) 

57 prev = v 

58 offset += len(r) - (v - u) 

59 result.extend(ls[prev:]) 

60 assert len(result) == len(ls) + offset 

61 return result 

62 

63 

64class IntList(Sequence[int]): 

65 """Class for storing a list of non-negative integers compactly. 

66 

67 We store them as the smallest size integer array we can get 

68 away with. When we try to add an integer that is too large, 

69 we upgrade the array to the smallest word size needed to store 

70 the new value.""" 

71 

72 ARRAY_CODES: ClassVar[list[str]] = ["B", "H", "I", "L", "Q", "O"] 

73 NEXT_ARRAY_CODE: ClassVar[dict[str, str]] = dict(zip(ARRAY_CODES, ARRAY_CODES[1:])) 

74 

75 __slots__ = ("__underlying",) 

76 

77 def __init__(self, values: Sequence[int] = ()): 

78 for code in self.ARRAY_CODES: 

79 try: 

80 underlying = self._array_or_list(code, values) 

81 break 

82 except OverflowError: 

83 pass 

84 else: # pragma: no cover 

85 raise AssertionError(f"Could not create storage for {values!r}") 

86 if isinstance(underlying, list): 

87 for v in underlying: 

88 if not isinstance(v, int) or v < 0: 

89 raise ValueError(f"Could not create IntList for {values!r}") 

90 self.__underlying: Union[list[int], "ArrayType[int]"] = underlying 

91 

92 @classmethod 

93 def of_length(cls, n: int) -> "IntList": 

94 return cls(array.array("B", [0]) * n) 

95 

96 @staticmethod 

97 def _array_or_list( 

98 code: str, contents: Iterable[int] 

99 ) -> Union[list[int], "ArrayType[int]"]: 

100 if code == "O": 

101 return list(contents) 

102 return array.array(code, contents) 

103 

104 def count(self, value: int) -> int: 

105 return self.__underlying.count(value) 

106 

107 def __repr__(self) -> str: 

108 return f"IntList({list(self.__underlying)!r})" 

109 

110 def __len__(self) -> int: 

111 return len(self.__underlying) 

112 

113 @overload 

114 def __getitem__(self, i: int) -> int: ... # pragma: no cover 

115 

116 @overload 

117 def __getitem__( 

118 self, i: slice 

119 ) -> Union[list[int], "ArrayType[int]"]: ... # pragma: no cover 

120 

121 def __getitem__( 

122 self, i: Union[int, slice] 

123 ) -> Union[int, list[int], "ArrayType[int]"]: 

124 return self.__underlying[i] 

125 

126 def __delitem__(self, i: Union[int, slice]) -> None: 

127 del self.__underlying[i] 

128 

129 def insert(self, i: int, v: int) -> None: 

130 self.__underlying.insert(i, v) 

131 

132 def __iter__(self) -> Iterator[int]: 

133 return iter(self.__underlying) 

134 

135 def __eq__(self, other: object) -> bool: 

136 if self is other: 

137 return True 

138 if not isinstance(other, IntList): 

139 return NotImplemented 

140 return self.__underlying == other.__underlying 

141 

142 def __ne__(self, other: object) -> bool: 

143 if self is other: 

144 return False 

145 if not isinstance(other, IntList): 

146 return NotImplemented 

147 return self.__underlying != other.__underlying 

148 

149 def append(self, n: int) -> None: 

150 # try the fast path of appending n first. If this overflows, use the 

151 # __setitem__ path, which will upgrade the underlying array. 

152 try: 

153 self.__underlying.append(n) 

154 except OverflowError: 

155 i = len(self.__underlying) 

156 self.__underlying.append(0) 

157 self[i] = n 

158 

159 def __setitem__(self, i: int, n: int) -> None: 

160 while True: 

161 try: 

162 self.__underlying[i] = n 

163 return 

164 except OverflowError: 

165 assert n > 0 

166 self.__upgrade() 

167 

168 def extend(self, ls: Iterable[int]) -> None: 

169 for n in ls: 

170 self.append(n) 

171 

172 def __upgrade(self) -> None: 

173 assert isinstance(self.__underlying, array.array) 

174 code = self.NEXT_ARRAY_CODE[self.__underlying.typecode] 

175 self.__underlying = self._array_or_list(code, self.__underlying) 

176 

177 

178def binary_search(lo: int, hi: int, f: Callable[[int], bool]) -> int: 

179 """Binary searches in [lo , hi) to find 

180 n such that f(n) == f(lo) but f(n + 1) != f(lo). 

181 It is implicitly assumed and will not be checked 

182 that f(hi) != f(lo). 

183 """ 

184 

185 reference = f(lo) 

186 

187 while lo + 1 < hi: 

188 mid = (lo + hi) // 2 

189 if f(mid) == reference: 

190 lo = mid 

191 else: 

192 hi = mid 

193 return lo 

194 

195 

196class LazySequenceCopy(Generic[T]): 

197 """A "copy" of a sequence that works by inserting a mask in front 

198 of the underlying sequence, so that you can mutate it without changing 

199 the underlying sequence. Effectively behaves as if you could do list(x) 

200 in O(1) time. The full list API is not supported yet but there's no reason 

201 in principle it couldn't be.""" 

202 

203 def __init__(self, values: Sequence[T]): 

204 self.__values = values 

205 self.__len = len(values) 

206 self.__mask: Optional[dict[int, T]] = None 

207 self.__popped_indices: Optional[SortedList[int]] = None 

208 

209 def __len__(self) -> int: 

210 if self.__popped_indices is None: 

211 return self.__len 

212 return self.__len - len(self.__popped_indices) 

213 

214 def pop(self, i: int = -1) -> T: 

215 if len(self) == 0: 

216 raise IndexError("Cannot pop from empty list") 

217 i = self.__underlying_index(i) 

218 

219 v = None 

220 if self.__mask is not None: 

221 v = self.__mask.pop(i, None) 

222 if v is None: 

223 v = self.__values[i] 

224 

225 if self.__popped_indices is None: 

226 self.__popped_indices = SortedList() 

227 self.__popped_indices.add(i) 

228 return v 

229 

230 def swap(self, i: int, j: int) -> None: 

231 """Swap the elements ls[i], ls[j].""" 

232 if i == j: 

233 return 

234 self[i], self[j] = self[j], self[i] 

235 

236 def __getitem__(self, i: int) -> T: 

237 i = self.__underlying_index(i) 

238 

239 default = self.__values[i] 

240 if self.__mask is None: 

241 return default 

242 else: 

243 return self.__mask.get(i, default) 

244 

245 def __setitem__(self, i: int, v: T) -> None: 

246 i = self.__underlying_index(i) 

247 if self.__mask is None: 

248 self.__mask = {} 

249 self.__mask[i] = v 

250 

251 def __underlying_index(self, i: int) -> int: 

252 n = len(self) 

253 if i < -n or i >= n: 

254 raise IndexError(f"Index {i} out of range [0, {n})") 

255 if i < 0: 

256 i += n 

257 assert 0 <= i < n 

258 

259 if self.__popped_indices is not None: 

260 # given an index i in the popped representation of the list, compute 

261 # its corresponding index in the underlying list. given 

262 # l = [1, 4, 2, 10, 188] 

263 # l.pop(3) 

264 # l.pop(1) 

265 # assert l == [1, 2, 188] 

266 # 

267 # we want l[i] == self.__values[f(i)], where f is this function. 

268 assert len(self.__popped_indices) <= len(self.__values) 

269 

270 for idx in self.__popped_indices: 

271 if idx > i: 

272 break 

273 i += 1 

274 return i 

275 

276 # even though we have len + getitem, mypyc requires iter. 

277 def __iter__(self) -> Iterable[T]: 

278 for i in range(len(self)): 

279 yield self[i] 

280 

281 

282def stack_depth_of_caller() -> int: 

283 """Get stack size for caller's frame. 

284 

285 From https://stackoverflow.com/a/47956089/9297601 , this is a simple 

286 but much faster alternative to `len(inspect.stack(0))`. We use it 

287 with get/set recursionlimit to make stack overflows non-flaky; see 

288 https://github.com/HypothesisWorks/hypothesis/issues/2494 for details. 

289 """ 

290 frame = sys._getframe(2) 

291 size = 1 

292 while frame: 

293 frame = frame.f_back # type: ignore[assignment] 

294 size += 1 

295 return size 

296 

297 

298class StackframeLimiter: 

299 # StackframeLimiter is used to make the recursion limit warning issued via 

300 # ensure_free_stackframes thread-safe. We track the known values we have 

301 # passed to sys.setrecursionlimit in _known_limits, and only issue a warning 

302 # if sys.getrecursionlimit is not in _known_limits. 

303 # 

304 # This will always be an under-approximation of when we would ideally issue 

305 # this warning, since a non-hypothesis caller could coincidentaly set the 

306 # recursion limit to one of our known limits. Currently, StackframeLimiter 

307 # resets _known_limits whenever all of the ensure_free_stackframes contexts 

308 # have exited. We could increase the power of the warning by tracking a 

309 # refcount for each limit, and removing it as soon as the refcount hits zero. 

310 # I didn't think this extra complexity is worth the minor power increase for 

311 # what is already only a "nice to have" warning. 

312 

313 def __init__(self): 

314 self._active_contexts = 0 

315 self._known_limits: set[int] = set() 

316 self._original_limit: Optional[int] = None 

317 

318 def _setrecursionlimit(self, new_limit: int, *, check: bool = True) -> None: 

319 if check and sys.getrecursionlimit() not in self._known_limits: 

320 warnings.warn( 

321 "The recursion limit will not be reset, since it was changed " 

322 "during test execution.", 

323 HypothesisWarning, 

324 stacklevel=4, 

325 ) 

326 return 

327 

328 self._known_limits.add(new_limit) 

329 sys.setrecursionlimit(new_limit) 

330 

331 def enter_context(self, new_limit: int, *, current_limit: int) -> None: 

332 if self._active_contexts == 0: 

333 # this is the first context on the stack. Record the true original 

334 # limit, to restore later. 

335 assert self._original_limit is None 

336 self._original_limit = current_limit 

337 self._known_limits.add(self._original_limit) 

338 

339 self._active_contexts += 1 

340 self._setrecursionlimit(new_limit) 

341 

342 def exit_context(self, new_limit: int, *, check: bool = True) -> None: 

343 assert self._active_contexts > 0 

344 self._active_contexts -= 1 

345 

346 if self._active_contexts == 0: 

347 # this is the last context to exit. Restore the true original 

348 # limit and clear our known limits. 

349 original_limit = self._original_limit 

350 assert original_limit is not None 

351 try: 

352 self._setrecursionlimit(original_limit, check=check) 

353 finally: 

354 self._original_limit = None 

355 # we want to clear the known limits, but preserve the limit 

356 # we just set it to as known. 

357 self._known_limits = {original_limit} 

358 else: 

359 self._setrecursionlimit(new_limit, check=check) 

360 

361 

362_stackframe_limiter = StackframeLimiter() 

363_stackframe_limiter_lock = Lock() 

364 

365 

366class ensure_free_stackframes: 

367 """Context manager that ensures there are at least N free stackframes (for 

368 a reasonable value of N). 

369 """ 

370 

371 def __enter__(self) -> None: 

372 cur_depth = stack_depth_of_caller() 

373 with _stackframe_limiter_lock: 

374 self.old_limit = sys.getrecursionlimit() 

375 # The default CPython recursionlimit is 1000, but pytest seems to bump 

376 # it to 3000 during test execution. Let's make it something reasonable: 

377 self.new_limit = cur_depth + 2000 

378 # Because we add to the recursion limit, to be good citizens we also 

379 # add a check for unbounded recursion. The default limit is typically 

380 # 1000/3000, so this can only ever trigger if something really strange 

381 # is happening and it's hard to imagine an 

382 # intentionally-deeply-recursive use of this code. 

383 assert cur_depth <= 1000, ( 

384 "Hypothesis would usually add %d to the stack depth of %d here, " 

385 "but we are already much deeper than expected. Aborting now, to " 

386 "avoid extending the stack limit in an infinite loop..." 

387 % (self.new_limit - self.old_limit, self.old_limit) 

388 ) 

389 try: 

390 _stackframe_limiter.enter_context( 

391 self.new_limit, current_limit=self.old_limit 

392 ) 

393 except Exception: 

394 # if the stackframe limiter raises a HypothesisWarning (under eg 

395 # -Werror), __exit__ is not called, since we errored in __enter__. 

396 # Preserve the state of the stackframe limiter by exiting, and 

397 # avoid showing a duplicate warning with check=False. 

398 _stackframe_limiter.exit_context(self.old_limit, check=False) 

399 raise 

400 

401 def __exit__(self, *args, **kwargs): 

402 with _stackframe_limiter_lock: 

403 _stackframe_limiter.exit_context(self.old_limit) 

404 

405 

406def find_integer(f: Callable[[int], bool]) -> int: 

407 """Finds a (hopefully large) integer such that f(n) is True and f(n + 1) is 

408 False. 

409 

410 f(0) is assumed to be True and will not be checked. 

411 """ 

412 # We first do a linear scan over the small numbers and only start to do 

413 # anything intelligent if f(4) is true. This is because it's very hard to 

414 # win big when the result is small. If the result is 0 and we try 2 first 

415 # then we've done twice as much work as we needed to! 

416 for i in range(1, 5): 

417 if not f(i): 

418 return i - 1 

419 

420 # We now know that f(4) is true. We want to find some number for which 

421 # f(n) is *not* true. 

422 # lo is the largest number for which we know that f(lo) is true. 

423 lo = 4 

424 

425 # Exponential probe upwards until we find some value hi such that f(hi) 

426 # is not true. Subsequently we maintain the invariant that hi is the 

427 # smallest number for which we know that f(hi) is not true. 

428 hi = 5 

429 while f(hi): 

430 lo = hi 

431 hi *= 2 

432 

433 # Now binary search until lo + 1 = hi. At that point we have f(lo) and not 

434 # f(lo + 1), as desired.. 

435 while lo + 1 < hi: 

436 mid = (lo + hi) // 2 

437 if f(mid): 

438 lo = mid 

439 else: 

440 hi = mid 

441 return lo 

442 

443 

444class NotFound(Exception): 

445 pass 

446 

447 

448class SelfOrganisingList(Generic[T]): 

449 """A self-organising list with the move-to-front heuristic. 

450 

451 A self-organising list is a collection which we want to retrieve items 

452 that satisfy some predicate from. There is no faster way to do this than 

453 a linear scan (as the predicates may be arbitrary), but the performance 

454 of a linear scan can vary dramatically - if we happen to find a good item 

455 on the first try it's O(1) after all. The idea of a self-organising list is 

456 to reorder the list to try to get lucky this way as often as possible. 

457 

458 There are various heuristics we could use for this, and it's not clear 

459 which are best. We use the simplest, which is that every time we find 

460 an item we move it to the "front" (actually the back in our implementation 

461 because we iterate in reverse) of the list. 

462 

463 """ 

464 

465 def __init__(self, values: Iterable[T] = ()) -> None: 

466 self.__values = list(values) 

467 

468 def __repr__(self) -> str: 

469 return f"SelfOrganisingList({self.__values!r})" 

470 

471 def add(self, value: T) -> None: 

472 """Add a value to this list.""" 

473 self.__values.append(value) 

474 

475 def find(self, condition: Callable[[T], bool]) -> T: 

476 """Returns some value in this list such that ``condition(value)`` is 

477 True. If no such value exists raises ``NotFound``.""" 

478 for i in range(len(self.__values) - 1, -1, -1): 

479 value = self.__values[i] 

480 if condition(value): 

481 del self.__values[i] 

482 self.__values.append(value) 

483 return value 

484 raise NotFound("No values satisfying condition") 

485 

486 

487_gc_initialized = False 

488_gc_start: float = 0 

489_gc_cumulative_time: float = 0 

490 

491# Since gc_callback potentially runs in test context, and perf_counter 

492# might be monkeypatched, we store a reference to the real one. 

493_perf_counter = time.perf_counter 

494 

495 

496def gc_cumulative_time() -> float: 

497 global _gc_initialized 

498 

499 # I don't believe we need a lock for the _gc_cumulative_time increment here, 

500 # since afaik each gc callback is only executed once when the garbage collector 

501 # runs, by the thread which initiated the gc. 

502 

503 if not _gc_initialized: 

504 if hasattr(gc, "callbacks"): 

505 # CPython 

506 def gc_callback( 

507 phase: Literal["start", "stop"], info: dict[str, int] 

508 ) -> None: 

509 global _gc_start, _gc_cumulative_time 

510 try: 

511 now = _perf_counter() 

512 if phase == "start": 

513 _gc_start = now 

514 elif phase == "stop" and _gc_start > 0: 

515 _gc_cumulative_time += now - _gc_start # pragma: no cover # ?? 

516 except RecursionError: # pragma: no cover 

517 # Avoid flakiness via UnraisableException, which is caught and 

518 # warned by pytest. The actual callback (this function) is 

519 # validated to never trigger a RecursionError itself when 

520 # when called by gc.collect. 

521 # Anyway, we should hit the same error on "start" 

522 # and "stop", but to ensure we don't get out of sync we just 

523 # signal that there is no matching start. 

524 _gc_start = 0 

525 return 

526 

527 gc.callbacks.insert(0, gc_callback) 

528 elif hasattr(gc, "hooks"): # pragma: no cover # pypy only 

529 # PyPy 

530 def hook(stats: Any) -> None: 

531 global _gc_cumulative_time 

532 try: 

533 _gc_cumulative_time += stats.duration 

534 except RecursionError: 

535 pass 

536 

537 if gc.hooks.on_gc_minor is None: 

538 gc.hooks.on_gc_minor = hook 

539 if gc.hooks.on_gc_collect_step is None: 

540 gc.hooks.on_gc_collect_step = hook 

541 

542 _gc_initialized = True 

543 

544 return _gc_cumulative_time 

545 

546 

547def startswith(l1: Sequence[T], l2: Sequence[T]) -> bool: 

548 if len(l1) < len(l2): 

549 return False 

550 return all(v1 == v2 for v1, v2 in zip(l1[: len(l2)], l2)) 

551 

552 

553def endswith(l1: Sequence[T], l2: Sequence[T]) -> bool: 

554 if len(l1) < len(l2): 

555 return False 

556 return all(v1 == v2 for v1, v2 in zip(l1[-len(l2) :], l2)) 

557 

558 

559def bits_to_bytes(n: int) -> int: 

560 """The number of bytes required to represent an n-bit number. 

561 Equivalent to (n + 7) // 8, but slightly faster. This really is 

562 called enough times that that matters.""" 

563 return (n + 7) >> 3