Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/hypothesis/internal/conjecture/junkdrawer.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

223 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""A module for miscellaneous useful bits and bobs that don't 

12obviously belong anywhere else. If you spot a better home for 

13anything that lives here, please move it.""" 

14 

15import array 

16import gc 

17import sys 

18import time 

19import warnings 

20from random import Random 

21from typing import ( 

22 Callable, 

23 Dict, 

24 Generic, 

25 Iterable, 

26 Iterator, 

27 List, 

28 Optional, 

29 Sequence, 

30 Tuple, 

31 TypeVar, 

32 Union, 

33 overload, 

34) 

35 

36from sortedcontainers import SortedList 

37 

38from hypothesis.errors import HypothesisWarning 

39 

40ARRAY_CODES = ["B", "H", "I", "L", "Q", "O"] 

41 

42T = TypeVar("T") 

43 

44 

45def array_or_list( 

46 code: str, contents: Iterable[int] 

47) -> "Union[List[int], array.ArrayType[int]]": 

48 if code == "O": 

49 return list(contents) 

50 return array.array(code, contents) 

51 

52 

53def replace_all( 

54 ls: Sequence[T], 

55 replacements: Iterable[Tuple[int, int, Sequence[T]]], 

56) -> List[T]: 

57 """Substitute multiple replacement values into a list. 

58 

59 Replacements is a list of (start, end, value) triples. 

60 """ 

61 

62 result: List[T] = [] 

63 prev = 0 

64 offset = 0 

65 for u, v, r in replacements: 

66 result.extend(ls[prev:u]) 

67 result.extend(r) 

68 prev = v 

69 offset += len(r) - (v - u) 

70 result.extend(ls[prev:]) 

71 assert len(result) == len(ls) + offset 

72 return result 

73 

74 

75NEXT_ARRAY_CODE = dict(zip(ARRAY_CODES, ARRAY_CODES[1:])) 

76 

77 

78class IntList(Sequence[int]): 

79 """Class for storing a list of non-negative integers compactly. 

80 

81 We store them as the smallest size integer array we can get 

82 away with. When we try to add an integer that is too large, 

83 we upgrade the array to the smallest word size needed to store 

84 the new value.""" 

85 

86 __slots__ = ("__underlying",) 

87 

88 __underlying: "Union[List[int], array.ArrayType[int]]" 

89 

90 def __init__(self, values: Sequence[int] = ()): 

91 for code in ARRAY_CODES: 

92 try: 

93 underlying = array_or_list(code, values) 

94 break 

95 except OverflowError: 

96 pass 

97 else: # pragma: no cover 

98 raise AssertionError(f"Could not create storage for {values!r}") 

99 if isinstance(underlying, list): 

100 for v in underlying: 

101 if not isinstance(v, int) or v < 0: 

102 raise ValueError(f"Could not create IntList for {values!r}") 

103 self.__underlying = underlying 

104 

105 @classmethod 

106 def of_length(cls, n: int) -> "IntList": 

107 return cls(array_or_list("B", [0]) * n) 

108 

109 def count(self, value: int) -> int: 

110 return self.__underlying.count(value) 

111 

112 def __repr__(self): 

113 return f"IntList({list(self.__underlying)!r})" 

114 

115 def __len__(self): 

116 return len(self.__underlying) 

117 

118 @overload 

119 def __getitem__(self, i: int) -> int: ... # pragma: no cover 

120 

121 @overload 

122 def __getitem__(self, i: slice) -> "IntList": ... # pragma: no cover 

123 

124 def __getitem__(self, i: Union[int, slice]) -> "Union[int, IntList]": 

125 if isinstance(i, slice): 

126 return IntList(self.__underlying[i]) 

127 return self.__underlying[i] 

128 

129 def __delitem__(self, i: int) -> None: 

130 del self.__underlying[i] 

131 

132 def insert(self, i: int, v: int) -> None: 

133 self.__underlying.insert(i, v) 

134 

135 def __iter__(self) -> Iterator[int]: 

136 return iter(self.__underlying) 

137 

138 def __eq__(self, other: object) -> bool: 

139 if self is other: 

140 return True 

141 if not isinstance(other, IntList): 

142 return NotImplemented 

143 return self.__underlying == other.__underlying 

144 

145 def __ne__(self, other: object) -> bool: 

146 if self is other: 

147 return False 

148 if not isinstance(other, IntList): 

149 return NotImplemented 

150 return self.__underlying != other.__underlying 

151 

152 def append(self, n: int) -> None: 

153 i = len(self) 

154 self.__underlying.append(0) 

155 self[i] = n 

156 

157 def __setitem__(self, i: int, n: int) -> None: 

158 while True: 

159 try: 

160 self.__underlying[i] = n 

161 return 

162 except OverflowError: 

163 assert n > 0 

164 self.__upgrade() 

165 

166 def extend(self, ls: Iterable[int]) -> None: 

167 for n in ls: 

168 self.append(n) 

169 

170 def __upgrade(self) -> None: 

171 assert isinstance(self.__underlying, array.array) 

172 code = NEXT_ARRAY_CODE[self.__underlying.typecode] 

173 self.__underlying = array_or_list(code, self.__underlying) 

174 

175 

176def binary_search(lo: int, hi: int, f: Callable[[int], bool]) -> int: 

177 """Binary searches in [lo , hi) to find 

178 n such that f(n) == f(lo) but f(n + 1) != f(lo). 

179 It is implicitly assumed and will not be checked 

180 that f(hi) != f(lo). 

181 """ 

182 

183 reference = f(lo) 

184 

185 while lo + 1 < hi: 

186 mid = (lo + hi) // 2 

187 if f(mid) == reference: 

188 lo = mid 

189 else: 

190 hi = mid 

191 return lo 

192 

193 

194def uniform(random: Random, n: int) -> bytes: 

195 """Returns a bytestring of length n, distributed uniformly at random.""" 

196 return random.getrandbits(n * 8).to_bytes(n, "big") 

197 

198 

199class LazySequenceCopy: 

200 """A "copy" of a sequence that works by inserting a mask in front 

201 of the underlying sequence, so that you can mutate it without changing 

202 the underlying sequence. Effectively behaves as if you could do list(x) 

203 in O(1) time. The full list API is not supported yet but there's no reason 

204 in principle it couldn't be.""" 

205 

206 def __init__(self, values: Sequence[int]): 

207 self.__values = values 

208 self.__len = len(values) 

209 self.__mask: Optional[Dict[int, int]] = None 

210 self.__popped_indices: Optional[SortedList] = None 

211 

212 def __len__(self) -> int: 

213 if self.__popped_indices is None: 

214 return self.__len 

215 return self.__len - len(self.__popped_indices) 

216 

217 def pop(self, i: int = -1) -> int: 

218 if len(self) == 0: 

219 raise IndexError("Cannot pop from empty list") 

220 i = self.__underlying_index(i) 

221 

222 v = None 

223 if self.__mask is not None: 

224 v = self.__mask.pop(i, None) 

225 if v is None: 

226 v = self.__values[i] 

227 

228 if self.__popped_indices is None: 

229 self.__popped_indices = SortedList() 

230 self.__popped_indices.add(i) 

231 return v 

232 

233 def __getitem__(self, i: int) -> int: 

234 i = self.__underlying_index(i) 

235 

236 default = self.__values[i] 

237 if self.__mask is None: 

238 return default 

239 else: 

240 return self.__mask.get(i, default) 

241 

242 def __setitem__(self, i: int, v: int) -> None: 

243 i = self.__underlying_index(i) 

244 if self.__mask is None: 

245 self.__mask = {} 

246 self.__mask[i] = v 

247 

248 def __underlying_index(self, i: int) -> int: 

249 n = len(self) 

250 if i < -n or i >= n: 

251 raise IndexError(f"Index {i} out of range [0, {n})") 

252 if i < 0: 

253 i += n 

254 assert 0 <= i < n 

255 

256 if self.__popped_indices is not None: 

257 # given an index i in the popped representation of the list, compute 

258 # its corresponding index in the underlying list. given 

259 # l = [1, 4, 2, 10, 188] 

260 # l.pop(3) 

261 # l.pop(1) 

262 # assert l == [1, 2, 188] 

263 # 

264 # we want l[i] == self.__values[f(i)], where f is this function. 

265 assert len(self.__popped_indices) <= len(self.__values) 

266 

267 for idx in self.__popped_indices: 

268 if idx > i: 

269 break 

270 i += 1 

271 return i 

272 

273 

274def clamp(lower: float, value: float, upper: float) -> float: 

275 """Given a value and lower/upper bounds, 'clamp' the value so that 

276 it satisfies lower <= value <= upper.""" 

277 return max(lower, min(value, upper)) 

278 

279 

280def swap(ls: LazySequenceCopy, i: int, j: int) -> None: 

281 """Swap the elements ls[i], ls[j].""" 

282 if i == j: 

283 return 

284 ls[i], ls[j] = ls[j], ls[i] 

285 

286 

287def stack_depth_of_caller() -> int: 

288 """Get stack size for caller's frame. 

289 

290 From https://stackoverflow.com/a/47956089/9297601 , this is a simple 

291 but much faster alternative to `len(inspect.stack(0))`. We use it 

292 with get/set recursionlimit to make stack overflows non-flaky; see 

293 https://github.com/HypothesisWorks/hypothesis/issues/2494 for details. 

294 """ 

295 frame = sys._getframe(2) 

296 size = 1 

297 while frame: 

298 frame = frame.f_back # type: ignore[assignment] 

299 size += 1 

300 return size 

301 

302 

303class ensure_free_stackframes: 

304 """Context manager that ensures there are at least N free stackframes (for 

305 a reasonable value of N). 

306 """ 

307 

308 def __enter__(self): 

309 cur_depth = stack_depth_of_caller() 

310 self.old_maxdepth = sys.getrecursionlimit() 

311 # The default CPython recursionlimit is 1000, but pytest seems to bump 

312 # it to 3000 during test execution. Let's make it something reasonable: 

313 self.new_maxdepth = cur_depth + 2000 

314 # Because we add to the recursion limit, to be good citizens we also 

315 # add a check for unbounded recursion. The default limit is typically 

316 # 1000/3000, so this can only ever trigger if something really strange 

317 # is happening and it's hard to imagine an 

318 # intentionally-deeply-recursive use of this code. 

319 assert cur_depth <= 1000, ( 

320 "Hypothesis would usually add %d to the stack depth of %d here, " 

321 "but we are already much deeper than expected. Aborting now, to " 

322 "avoid extending the stack limit in an infinite loop..." 

323 % (self.new_maxdepth - self.old_maxdepth, self.old_maxdepth) 

324 ) 

325 sys.setrecursionlimit(self.new_maxdepth) 

326 

327 def __exit__(self, *args, **kwargs): 

328 if self.new_maxdepth == sys.getrecursionlimit(): 

329 sys.setrecursionlimit(self.old_maxdepth) 

330 else: # pragma: no cover 

331 warnings.warn( 

332 "The recursion limit will not be reset, since it was changed " 

333 "from another thread or during execution of a test.", 

334 HypothesisWarning, 

335 stacklevel=2, 

336 ) 

337 

338 

339def find_integer(f: Callable[[int], bool]) -> int: 

340 """Finds a (hopefully large) integer such that f(n) is True and f(n + 1) is 

341 False. 

342 

343 f(0) is assumed to be True and will not be checked. 

344 """ 

345 # We first do a linear scan over the small numbers and only start to do 

346 # anything intelligent if f(4) is true. This is because it's very hard to 

347 # win big when the result is small. If the result is 0 and we try 2 first 

348 # then we've done twice as much work as we needed to! 

349 for i in range(1, 5): 

350 if not f(i): 

351 return i - 1 

352 

353 # We now know that f(4) is true. We want to find some number for which 

354 # f(n) is *not* true. 

355 # lo is the largest number for which we know that f(lo) is true. 

356 lo = 4 

357 

358 # Exponential probe upwards until we find some value hi such that f(hi) 

359 # is not true. Subsequently we maintain the invariant that hi is the 

360 # smallest number for which we know that f(hi) is not true. 

361 hi = 5 

362 while f(hi): 

363 lo = hi 

364 hi *= 2 

365 

366 # Now binary search until lo + 1 = hi. At that point we have f(lo) and not 

367 # f(lo + 1), as desired.. 

368 while lo + 1 < hi: 

369 mid = (lo + hi) // 2 

370 if f(mid): 

371 lo = mid 

372 else: 

373 hi = mid 

374 return lo 

375 

376 

377class NotFound(Exception): 

378 pass 

379 

380 

381class SelfOrganisingList(Generic[T]): 

382 """A self-organising list with the move-to-front heuristic. 

383 

384 A self-organising list is a collection which we want to retrieve items 

385 that satisfy some predicate from. There is no faster way to do this than 

386 a linear scan (as the predicates may be arbitrary), but the performance 

387 of a linear scan can vary dramatically - if we happen to find a good item 

388 on the first try it's O(1) after all. The idea of a self-organising list is 

389 to reorder the list to try to get lucky this way as often as possible. 

390 

391 There are various heuristics we could use for this, and it's not clear 

392 which are best. We use the simplest, which is that every time we find 

393 an item we move it to the "front" (actually the back in our implementation 

394 because we iterate in reverse) of the list. 

395 

396 """ 

397 

398 def __init__(self, values: Iterable[T] = ()) -> None: 

399 self.__values = list(values) 

400 

401 def __repr__(self) -> str: 

402 return f"SelfOrganisingList({self.__values!r})" 

403 

404 def add(self, value: T) -> None: 

405 """Add a value to this list.""" 

406 self.__values.append(value) 

407 

408 def find(self, condition: Callable[[T], bool]) -> T: 

409 """Returns some value in this list such that ``condition(value)`` is 

410 True. If no such value exists raises ``NotFound``.""" 

411 for i in range(len(self.__values) - 1, -1, -1): 

412 value = self.__values[i] 

413 if condition(value): 

414 del self.__values[i] 

415 self.__values.append(value) 

416 return value 

417 raise NotFound("No values satisfying condition") 

418 

419 

420_gc_initialized = False 

421_gc_start = 0 

422_gc_cumulative_time = 0 

423 

424 

425def gc_cumulative_time() -> float: 

426 global _gc_initialized 

427 if not _gc_initialized: 

428 if hasattr(gc, "callbacks"): 

429 # CPython 

430 def gc_callback(phase, info): 

431 global _gc_start, _gc_cumulative_time 

432 try: 

433 now = time.perf_counter() 

434 if phase == "start": 

435 _gc_start = now 

436 elif phase == "stop" and _gc_start > 0: 

437 _gc_cumulative_time += now - _gc_start # pragma: no cover # ?? 

438 except RecursionError: # pragma: no cover 

439 # Avoid flakiness via UnraisableException, which is caught and 

440 # warned by pytest. The actual callback (this function) is 

441 # validated to never trigger a RecursionError itself when 

442 # when called by gc.collect. 

443 # Anyway, we should hit the same error on "start" 

444 # and "stop", but to ensure we don't get out of sync we just 

445 # signal that there is no matching start. 

446 _gc_start = 0 

447 return 

448 

449 gc.callbacks.insert(0, gc_callback) 

450 elif hasattr(gc, "hooks"): # pragma: no cover # pypy only 

451 # PyPy 

452 def hook(stats): 

453 global _gc_cumulative_time 

454 try: 

455 _gc_cumulative_time += stats.duration 

456 except RecursionError: 

457 pass 

458 

459 if gc.hooks.on_gc_minor is None: 

460 gc.hooks.on_gc_minor = hook 

461 if gc.hooks.on_gc_collect_step is None: 

462 gc.hooks.on_gc_collect_step = hook 

463 

464 _gc_initialized = True 

465 

466 return _gc_cumulative_time