Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/junkdrawer.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

258 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""A module for miscellaneous useful bits and bobs that don't 

12obviously belong anywhere else. If you spot a better home for 

13anything that lives here, please move it.""" 

14 

15import array 

16import gc 

17import itertools 

18import sys 

19import time 

20import warnings 

21from array import ArrayType 

22from collections.abc import Callable, Iterable, Iterator, Sequence 

23from threading import Lock 

24from typing import ( 

25 Any, 

26 ClassVar, 

27 Generic, 

28 Literal, 

29 TypeVar, 

30 Union, 

31 overload, 

32) 

33 

34from sortedcontainers import SortedList 

35 

36from hypothesis.errors import HypothesisWarning 

37 

38T = TypeVar("T") 

39 

40 

41def replace_all( 

42 ls: Sequence[T], 

43 replacements: Iterable[tuple[int, int, Sequence[T]]], 

44) -> list[T]: 

45 """Substitute multiple replacement values into a list. 

46 

47 Replacements is a list of (start, end, value) triples. 

48 """ 

49 

50 result: list[T] = [] 

51 prev = 0 

52 offset = 0 

53 for u, v, r in replacements: 

54 result.extend(ls[prev:u]) 

55 result.extend(r) 

56 prev = v 

57 offset += len(r) - (v - u) 

58 result.extend(ls[prev:]) 

59 assert len(result) == len(ls) + offset 

60 return result 

61 

62 

63class IntList(Sequence[int]): 

64 """Class for storing a list of non-negative integers compactly. 

65 

66 We store them as the smallest size integer array we can get 

67 away with. When we try to add an integer that is too large, 

68 we upgrade the array to the smallest word size needed to store 

69 the new value.""" 

70 

71 ARRAY_CODES: ClassVar[list[str]] = ["B", "H", "I", "L", "Q", "O"] 

72 NEXT_ARRAY_CODE: ClassVar[dict[str, str]] = dict(itertools.pairwise(ARRAY_CODES)) 

73 

74 __slots__ = ("__underlying",) 

75 

76 def __init__(self, values: Sequence[int] = ()): 

77 for code in self.ARRAY_CODES: 

78 try: 

79 underlying = self._array_or_list(code, values) 

80 break 

81 except OverflowError: 

82 pass 

83 else: # pragma: no cover 

84 raise AssertionError(f"Could not create storage for {values!r}") 

85 if isinstance(underlying, list): 

86 for v in underlying: 

87 if not isinstance(v, int) or v < 0: 

88 raise ValueError(f"Could not create IntList for {values!r}") 

89 self.__underlying: list[int] | ArrayType[int] = underlying 

90 

91 @classmethod 

92 def of_length(cls, n: int) -> "IntList": 

93 return cls(array.array("B", [0]) * n) 

94 

95 @staticmethod 

96 def _array_or_list( 

97 code: str, contents: Iterable[int] 

98 ) -> Union[list[int], "ArrayType[int]"]: 

99 if code == "O": 

100 return list(contents) 

101 return array.array(code, contents) 

102 

103 def count(self, value: int) -> int: 

104 return self.__underlying.count(value) 

105 

106 def __repr__(self) -> str: 

107 return f"IntList({list(self.__underlying)!r})" 

108 

109 def __len__(self) -> int: 

110 return len(self.__underlying) 

111 

112 @overload 

113 def __getitem__(self, i: int) -> int: ... # pragma: no cover 

114 

115 @overload 

116 def __getitem__( 

117 self, i: slice 

118 ) -> "list[int] | ArrayType[int]": ... # pragma: no cover 

119 

120 def __getitem__(self, i: int | slice) -> "int | list[int] | ArrayType[int]": 

121 return self.__underlying[i] 

122 

123 def __delitem__(self, i: int | slice) -> None: 

124 del self.__underlying[i] 

125 

126 def insert(self, i: int, v: int) -> None: 

127 self.__underlying.insert(i, v) 

128 

129 def __iter__(self) -> Iterator[int]: 

130 return iter(self.__underlying) 

131 

132 def __eq__(self, other: object) -> bool: 

133 if self is other: 

134 return True 

135 if not isinstance(other, IntList): 

136 return NotImplemented 

137 return self.__underlying == other.__underlying 

138 

139 def __ne__(self, other: object) -> bool: 

140 if self is other: 

141 return False 

142 if not isinstance(other, IntList): 

143 return NotImplemented 

144 return self.__underlying != other.__underlying 

145 

146 def append(self, n: int) -> None: 

147 # try the fast path of appending n first. If this overflows, use the 

148 # __setitem__ path, which will upgrade the underlying array. 

149 try: 

150 self.__underlying.append(n) 

151 except OverflowError: 

152 i = len(self.__underlying) 

153 self.__underlying.append(0) 

154 self[i] = n 

155 

156 def __setitem__(self, i: int, n: int) -> None: 

157 while True: 

158 try: 

159 self.__underlying[i] = n 

160 return 

161 except OverflowError: 

162 assert n > 0 

163 self.__upgrade() 

164 

165 def extend(self, ls: Iterable[int]) -> None: 

166 for n in ls: 

167 self.append(n) 

168 

169 def __upgrade(self) -> None: 

170 assert isinstance(self.__underlying, array.array) 

171 code = self.NEXT_ARRAY_CODE[self.__underlying.typecode] 

172 self.__underlying = self._array_or_list(code, self.__underlying) 

173 

174 

175def binary_search(lo: int, hi: int, f: Callable[[int], bool]) -> int: 

176 """Binary searches in [lo , hi) to find 

177 n such that f(n) == f(lo) but f(n + 1) != f(lo). 

178 It is implicitly assumed and will not be checked 

179 that f(hi) != f(lo). 

180 """ 

181 

182 reference = f(lo) 

183 

184 while lo + 1 < hi: 

185 mid = (lo + hi) // 2 

186 if f(mid) == reference: 

187 lo = mid 

188 else: 

189 hi = mid 

190 return lo 

191 

192 

193class LazySequenceCopy(Generic[T]): 

194 """A "copy" of a sequence that works by inserting a mask in front 

195 of the underlying sequence, so that you can mutate it without changing 

196 the underlying sequence. Effectively behaves as if you could do list(x) 

197 in O(1) time. The full list API is not supported yet but there's no reason 

198 in principle it couldn't be.""" 

199 

200 def __init__(self, values: Sequence[T]): 

201 self.__values = values 

202 self.__len = len(values) 

203 self.__mask: dict[int, T] | None = None 

204 self.__popped_indices: SortedList[int] | None = None 

205 

206 def __len__(self) -> int: 

207 if self.__popped_indices is None: 

208 return self.__len 

209 return self.__len - len(self.__popped_indices) 

210 

211 def pop(self, i: int = -1) -> T: 

212 if len(self) == 0: 

213 raise IndexError("Cannot pop from empty list") 

214 i = self.__underlying_index(i) 

215 

216 v = None 

217 if self.__mask is not None: 

218 v = self.__mask.pop(i, None) 

219 if v is None: 

220 v = self.__values[i] 

221 

222 if self.__popped_indices is None: 

223 self.__popped_indices = SortedList() 

224 self.__popped_indices.add(i) 

225 return v 

226 

227 def swap(self, i: int, j: int) -> None: 

228 """Swap the elements ls[i], ls[j].""" 

229 if i == j: 

230 return 

231 self[i], self[j] = self[j], self[i] 

232 

233 def __getitem__(self, i: int) -> T: 

234 i = self.__underlying_index(i) 

235 

236 default = self.__values[i] 

237 if self.__mask is None: 

238 return default 

239 else: 

240 return self.__mask.get(i, default) 

241 

242 def __setitem__(self, i: int, v: T) -> None: 

243 i = self.__underlying_index(i) 

244 if self.__mask is None: 

245 self.__mask = {} 

246 self.__mask[i] = v 

247 

248 def __underlying_index(self, i: int) -> int: 

249 n = len(self) 

250 if i < -n or i >= n: 

251 raise IndexError(f"Index {i} out of range [0, {n})") 

252 if i < 0: 

253 i += n 

254 assert 0 <= i < n 

255 

256 if self.__popped_indices is not None: 

257 # given an index i in the popped representation of the list, compute 

258 # its corresponding index in the underlying list. given 

259 # l = [1, 4, 2, 10, 188] 

260 # l.pop(3) 

261 # l.pop(1) 

262 # assert l == [1, 2, 188] 

263 # 

264 # we want l[i] == self.__values[f(i)], where f is this function. 

265 assert len(self.__popped_indices) <= len(self.__values) 

266 

267 for idx in self.__popped_indices: 

268 if idx > i: 

269 break 

270 i += 1 

271 return i 

272 

273 # even though we have len + getitem, mypyc requires iter. 

274 def __iter__(self) -> Iterable[T]: 

275 for i in range(len(self)): 

276 yield self[i] 

277 

278 

279def stack_depth_of_caller() -> int: 

280 """Get stack size for caller's frame. 

281 

282 From https://stackoverflow.com/a/47956089/9297601 , this is a simple 

283 but much faster alternative to `len(inspect.stack(0))`. We use it 

284 with get/set recursionlimit to make stack overflows non-flaky; see 

285 https://github.com/HypothesisWorks/hypothesis/issues/2494 for details. 

286 """ 

287 frame = sys._getframe(2) 

288 size = 1 

289 while frame: 

290 frame = frame.f_back # type: ignore[assignment] 

291 size += 1 

292 return size 

293 

294 

295class StackframeLimiter: 

296 # StackframeLimiter is used to make the recursion limit warning issued via 

297 # ensure_free_stackframes thread-safe. We track the known values we have 

298 # passed to sys.setrecursionlimit in _known_limits, and only issue a warning 

299 # if sys.getrecursionlimit is not in _known_limits. 

300 # 

301 # This will always be an under-approximation of when we would ideally issue 

302 # this warning, since a non-hypothesis caller could coincidentaly set the 

303 # recursion limit to one of our known limits. Currently, StackframeLimiter 

304 # resets _known_limits whenever all of the ensure_free_stackframes contexts 

305 # have exited. We could increase the power of the warning by tracking a 

306 # refcount for each limit, and removing it as soon as the refcount hits zero. 

307 # I didn't think this extra complexity is worth the minor power increase for 

308 # what is already only a "nice to have" warning. 

309 

310 def __init__(self): 

311 self._active_contexts = 0 

312 self._known_limits: set[int] = set() 

313 self._original_limit: int | None = None 

314 

315 def _setrecursionlimit(self, new_limit: int, *, check: bool = True) -> None: 

316 if ( 

317 check 

318 and (current_limit := sys.getrecursionlimit()) not in self._known_limits 

319 ): 

320 warnings.warn( 

321 "The recursion limit will not be reset, since it was changed " 

322 f"during test execution (from {self._original_limit} to {current_limit}).", 

323 HypothesisWarning, 

324 stacklevel=4, 

325 ) 

326 return 

327 

328 self._known_limits.add(new_limit) 

329 sys.setrecursionlimit(new_limit) 

330 

331 def enter_context(self, new_limit: int, *, current_limit: int) -> None: 

332 if self._active_contexts == 0: 

333 # this is the first context on the stack. Record the true original 

334 # limit, to restore later. 

335 assert self._original_limit is None 

336 self._original_limit = current_limit 

337 self._known_limits.add(self._original_limit) 

338 

339 self._active_contexts += 1 

340 self._setrecursionlimit(new_limit) 

341 

342 def exit_context(self, new_limit: int, *, check: bool = True) -> None: 

343 assert self._active_contexts > 0 

344 self._active_contexts -= 1 

345 

346 if self._active_contexts == 0: 

347 # this is the last context to exit. Restore the true original 

348 # limit and clear our known limits. 

349 original_limit = self._original_limit 

350 assert original_limit is not None 

351 try: 

352 self._setrecursionlimit(original_limit, check=check) 

353 finally: 

354 self._original_limit = None 

355 # we want to clear the known limits, but preserve the limit 

356 # we just set it to as known. 

357 self._known_limits = {original_limit} 

358 else: 

359 self._setrecursionlimit(new_limit, check=check) 

360 

361 

362_stackframe_limiter = StackframeLimiter() 

363_stackframe_limiter_lock = Lock() 

364 

365 

366class ensure_free_stackframes: 

367 """Context manager that ensures there are at least N free stackframes (for 

368 a reasonable value of N). 

369 """ 

370 

371 def __enter__(self) -> None: 

372 cur_depth = stack_depth_of_caller() 

373 with _stackframe_limiter_lock: 

374 self.old_limit = sys.getrecursionlimit() 

375 # The default CPython recursionlimit is 1000, but pytest seems to bump 

376 # it to 3000 during test execution. Let's make it something reasonable: 

377 self.new_limit = cur_depth + 2000 

378 # Because we add to the recursion limit, to be good citizens we also 

379 # add a check for unbounded recursion. The default limit is typically 

380 # 1000/3000, so this can only ever trigger if something really strange 

381 # is happening and it's hard to imagine an 

382 # intentionally-deeply-recursive use of this code. 

383 assert cur_depth <= 1000, ( 

384 "Hypothesis would usually add %d to the stack depth of %d here, " 

385 "but we are already much deeper than expected. Aborting now, to " 

386 "avoid extending the stack limit in an infinite loop..." 

387 % (self.new_limit - self.old_limit, self.old_limit) 

388 ) 

389 try: 

390 _stackframe_limiter.enter_context( 

391 self.new_limit, current_limit=self.old_limit 

392 ) 

393 except Exception: 

394 # if the stackframe limiter raises a HypothesisWarning (under eg 

395 # -Werror), __exit__ is not called, since we errored in __enter__. 

396 # Preserve the state of the stackframe limiter by exiting, and 

397 # avoid showing a duplicate warning with check=False. 

398 _stackframe_limiter.exit_context(self.old_limit, check=False) 

399 raise 

400 

401 def __exit__(self, *args, **kwargs): 

402 with _stackframe_limiter_lock: 

403 _stackframe_limiter.exit_context(self.old_limit) 

404 

405 

406def find_integer(f: Callable[[int], bool]) -> int: 

407 """Finds a (hopefully large) integer such that f(n) is True and f(n + 1) is 

408 False. 

409 

410 f(0) is assumed to be True and will not be checked. 

411 """ 

412 # We first do a linear scan over the small numbers and only start to do 

413 # anything intelligent if f(4) is true. This is because it's very hard to 

414 # win big when the result is small. If the result is 0 and we try 2 first 

415 # then we've done twice as much work as we needed to! 

416 for i in range(1, 5): 

417 if not f(i): 

418 return i - 1 

419 

420 # We now know that f(4) is true. We want to find some number for which 

421 # f(n) is *not* true. 

422 # lo is the largest number for which we know that f(lo) is true. 

423 lo = 4 

424 

425 # Exponential probe upwards until we find some value hi such that f(hi) 

426 # is not true. Subsequently we maintain the invariant that hi is the 

427 # smallest number for which we know that f(hi) is not true. 

428 hi = 5 

429 while f(hi): 

430 lo = hi 

431 hi *= 2 

432 

433 # Now binary search until lo + 1 = hi. At that point we have f(lo) and not 

434 # f(lo + 1), as desired.. 

435 while lo + 1 < hi: 

436 mid = (lo + hi) // 2 

437 if f(mid): 

438 lo = mid 

439 else: 

440 hi = mid 

441 return lo 

442 

443 

444_gc_initialized = False 

445_gc_start: float = 0 

446_gc_cumulative_time: float = 0 

447 

448# Since gc_callback potentially runs in test context, and perf_counter 

449# might be monkeypatched, we store a reference to the real one. 

450_perf_counter = time.perf_counter 

451 

452 

453def gc_cumulative_time() -> float: 

454 global _gc_initialized 

455 

456 # I don't believe we need a lock for the _gc_cumulative_time increment here, 

457 # since afaik each gc callback is only executed once when the garbage collector 

458 # runs, by the thread which initiated the gc. 

459 

460 if not _gc_initialized: 

461 if hasattr(gc, "callbacks"): 

462 # CPython 

463 def gc_callback( 

464 phase: Literal["start", "stop"], info: dict[str, int] 

465 ) -> None: 

466 global _gc_start, _gc_cumulative_time 

467 try: 

468 now = _perf_counter() 

469 if phase == "start": 

470 _gc_start = now 

471 elif phase == "stop" and _gc_start > 0: 

472 _gc_cumulative_time += now - _gc_start # pragma: no cover # ?? 

473 except RecursionError: # pragma: no cover 

474 # Avoid flakiness via UnraisableException, which is caught and 

475 # warned by pytest. The actual callback (this function) is 

476 # validated to never trigger a RecursionError itself when 

477 # when called by gc.collect. 

478 # Anyway, we should hit the same error on "start" 

479 # and "stop", but to ensure we don't get out of sync we just 

480 # signal that there is no matching start. 

481 _gc_start = 0 

482 return 

483 

484 gc.callbacks.insert(0, gc_callback) 

485 elif hasattr(gc, "hooks"): # pragma: no cover # pypy only 

486 # PyPy 

487 def hook(stats: Any) -> None: 

488 global _gc_cumulative_time 

489 try: 

490 _gc_cumulative_time += stats.duration 

491 except RecursionError: 

492 pass 

493 

494 if gc.hooks.on_gc_minor is None: 

495 gc.hooks.on_gc_minor = hook 

496 if gc.hooks.on_gc_collect_step is None: 

497 gc.hooks.on_gc_collect_step = hook 

498 

499 _gc_initialized = True 

500 

501 return _gc_cumulative_time 

502 

503 

504def startswith(l1: Sequence[T], l2: Sequence[T]) -> bool: 

505 if len(l1) < len(l2): 

506 return False 

507 return all(v1 == v2 for v1, v2 in zip(l1[: len(l2)], l2, strict=False)) 

508 

509 

510def endswith(l1: Sequence[T], l2: Sequence[T]) -> bool: 

511 if len(l1) < len(l2): 

512 return False 

513 return all(v1 == v2 for v1, v2 in zip(l1[-len(l2) :], l2, strict=False)) 

514 

515 

516def bits_to_bytes(n: int) -> int: 

517 """The number of bytes required to represent an n-bit number. 

518 Equivalent to (n + 7) // 8, but slightly faster. This really is 

519 called enough times that that matters.""" 

520 return (n + 7) >> 3