Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/hypothesis/internal/conjecture/engine.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

638 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import importlib 

12import math 

13import time 

14from collections import defaultdict 

15from contextlib import contextmanager 

16from datetime import timedelta 

17from enum import Enum 

18from random import Random, getrandbits 

19from typing import ( 

20 Any, 

21 Callable, 

22 Dict, 

23 Final, 

24 FrozenSet, 

25 Generator, 

26 List, 

27 Literal, 

28 NoReturn, 

29 Optional, 

30 Set, 

31 Tuple, 

32 Union, 

33 overload, 

34) 

35 

36import attr 

37 

38from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings 

39from hypothesis._settings import local_settings 

40from hypothesis.database import ExampleDatabase 

41from hypothesis.errors import InvalidArgument, StopTest 

42from hypothesis.internal.cache import LRUReusedCache 

43from hypothesis.internal.compat import ( 

44 NotRequired, 

45 TypeAlias, 

46 TypedDict, 

47 ceil, 

48 int_from_bytes, 

49 override, 

50) 

51from hypothesis.internal.conjecture.data import ( 

52 AVAILABLE_PROVIDERS, 

53 ConjectureData, 

54 ConjectureResult, 

55 DataObserver, 

56 Example, 

57 HypothesisProvider, 

58 InterestingOrigin, 

59 IRNode, 

60 Overrun, 

61 PrimitiveProvider, 

62 Status, 

63 _Overrun, 

64 ir_kwargs_key, 

65 ir_value_key, 

66) 

67from hypothesis.internal.conjecture.datatree import ( 

68 DataTree, 

69 PreviouslyUnseenBehaviour, 

70 TreeRecordingObserver, 

71) 

72from hypothesis.internal.conjecture.junkdrawer import clamp, ensure_free_stackframes 

73from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser 

74from hypothesis.internal.conjecture.shrinker import Shrinker, sort_key 

75from hypothesis.internal.healthcheck import fail_health_check 

76from hypothesis.reporting import base_report, report 

77 

78MAX_SHRINKS: Final[int] = 500 

79CACHE_SIZE: Final[int] = 10000 

80MUTATION_POOL_SIZE: Final[int] = 100 

81MIN_TEST_CALLS: Final[int] = 10 

82BUFFER_SIZE: Final[int] = 8 * 1024 

83 

84# If the shrinking phase takes more than five minutes, abort it early and print 

85# a warning. Many CI systems will kill a build after around ten minutes with 

86# no output, and appearing to hang isn't great for interactive use either - 

87# showing partially-shrunk examples is better than quitting with no examples! 

88# (but make it monkeypatchable, for the rare users who need to keep on shrinking) 

89MAX_SHRINKING_SECONDS: Final[int] = 300 

90 

91Ls: TypeAlias = List["Ls | int"] 

92 

93 

94@attr.s 

95class HealthCheckState: 

96 valid_examples: int = attr.ib(default=0) 

97 invalid_examples: int = attr.ib(default=0) 

98 overrun_examples: int = attr.ib(default=0) 

99 draw_times: "defaultdict[str, List[float]]" = attr.ib( 

100 factory=lambda: defaultdict(list) 

101 ) 

102 

103 @property 

104 def total_draw_time(self) -> float: 

105 return math.fsum(sum(self.draw_times.values(), start=[])) 

106 

107 def timing_report(self) -> str: 

108 """Return a terminal report describing what was slow.""" 

109 if not self.draw_times: 

110 return "" 

111 width = max(len(k[len("generate:") :].strip(": ")) for k in self.draw_times) 

112 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"] 

113 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1])) 

114 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch 

115 # If we have very many unique keys, which can happen due to interactive 

116 # draws with computed labels, we'll skip uninformative rows. 

117 if ( 

118 5 <= i < (len(self.draw_times) - 2) 

119 and math.fsum(times) * 20 < self.total_draw_time 

120 ): 

121 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)") 

122 break 

123 # Compute the row to report, omitting times <1ms to focus on slow draws 

124 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4] 

125 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",") 

126 arg = argname[len("generate:") :].strip(": ") # removeprefix in py3.9 

127 out.append( 

128 f" {arg:^{width}} | {len(times):>4} | " 

129 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}" 

130 ) 

131 return "\n".join(out) 

132 

133 

134class ExitReason(Enum): 

135 max_examples = "settings.max_examples={s.max_examples}" 

136 max_iterations = ( 

137 "settings.max_examples={s.max_examples}, " 

138 "but < 10% of examples satisfied assumptions" 

139 ) 

140 max_shrinks = f"shrunk example {MAX_SHRINKS} times" 

141 finished = "nothing left to do" 

142 flaky = "test was flaky" 

143 very_slow_shrinking = "shrinking was very slow" 

144 

145 def describe(self, settings: Settings) -> str: 

146 return self.value.format(s=settings) 

147 

148 

149class RunIsComplete(Exception): 

150 pass 

151 

152 

153def _get_provider(backend: str) -> Union[type, PrimitiveProvider]: 

154 mname, cname = AVAILABLE_PROVIDERS[backend].rsplit(".", 1) 

155 provider_cls = getattr(importlib.import_module(mname), cname) 

156 if provider_cls.lifetime == "test_function": 

157 return provider_cls(None) 

158 elif provider_cls.lifetime == "test_case": 

159 return provider_cls 

160 else: 

161 raise InvalidArgument( 

162 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. " 

163 "Expected one of 'test_function', 'test_case'." 

164 ) 

165 

166 

167class CallStats(TypedDict): 

168 status: str 

169 runtime: float 

170 drawtime: float 

171 gctime: float 

172 events: List[str] 

173 

174 

175PhaseStatistics = TypedDict( 

176 "PhaseStatistics", 

177 { 

178 "duration-seconds": float, 

179 "test-cases": List[CallStats], 

180 "distinct-failures": int, 

181 "shrinks-successful": int, 

182 }, 

183) 

184StatisticsDict = TypedDict( 

185 "StatisticsDict", 

186 { 

187 "generate-phase": NotRequired[PhaseStatistics], 

188 "reuse-phase": NotRequired[PhaseStatistics], 

189 "shrink-phase": NotRequired[PhaseStatistics], 

190 "stopped-because": NotRequired[str], 

191 "targets": NotRequired[Dict[str, float]], 

192 }, 

193) 

194 

195 

196class ConjectureRunner: 

197 def __init__( 

198 self, 

199 test_function: Callable[[ConjectureData], None], 

200 *, 

201 settings: Optional[Settings] = None, 

202 random: Optional[Random] = None, 

203 database_key: Optional[bytes] = None, 

204 ignore_limits: bool = False, 

205 ) -> None: 

206 self._test_function: Callable[[ConjectureData], None] = test_function 

207 self.settings: Settings = settings or Settings() 

208 self.shrinks: int = 0 

209 self.finish_shrinking_deadline: Optional[float] = None 

210 self.call_count: int = 0 

211 self.misaligned_count: int = 0 

212 self.valid_examples: int = 0 

213 self.random: Random = random or Random(getrandbits(128)) 

214 self.database_key: Optional[bytes] = database_key 

215 self.ignore_limits: bool = ignore_limits 

216 

217 # Global dict of per-phase statistics, and a list of per-call stats 

218 # which transfer to the global dict at the end of each phase. 

219 self._current_phase: str = "(not a phase)" 

220 self.statistics: StatisticsDict = {} 

221 self.stats_per_test_case: List[CallStats] = [] 

222 

223 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests. 

224 self.interesting_examples: Dict[InterestingOrigin, ConjectureResult] = {} 

225 # We use call_count because there may be few possible valid_examples. 

226 self.first_bug_found_at: Optional[int] = None 

227 self.last_bug_found_at: Optional[int] = None 

228 

229 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests. 

230 self.shrunk_examples: Set[Optional[InterestingOrigin]] = set() 

231 

232 self.health_check_state: Optional[HealthCheckState] = None 

233 

234 self.tree: DataTree = DataTree() 

235 

236 self.provider: Union[type, PrimitiveProvider] = _get_provider( 

237 self.settings.backend 

238 ) 

239 

240 self.best_observed_targets: "defaultdict[str, float]" = defaultdict( 

241 lambda: NO_SCORE 

242 ) 

243 self.best_examples_of_observed_targets: Dict[str, ConjectureResult] = {} 

244 

245 # We keep the pareto front in the example database if we have one. This 

246 # is only marginally useful at present, but speeds up local development 

247 # because it means that large targets will be quickly surfaced in your 

248 # testing. 

249 self.pareto_front: Optional[ParetoFront] = None 

250 if self.database_key is not None and self.settings.database is not None: 

251 self.pareto_front = ParetoFront(self.random) 

252 self.pareto_front.on_evict(self.on_pareto_evict) 

253 

254 # We want to be able to get the ConjectureData object that results 

255 # from running a buffer without recalculating, especially during 

256 # shrinking where we need to know about the structure of the 

257 # executed test case. 

258 self.__data_cache = LRUReusedCache(CACHE_SIZE) 

259 self.__data_cache_ir = LRUReusedCache(CACHE_SIZE) 

260 

261 self.__pending_call_explanation: Optional[str] = None 

262 self._switch_to_hypothesis_provider: bool = False 

263 

264 def explain_next_call_as(self, explanation: str) -> None: 

265 self.__pending_call_explanation = explanation 

266 

267 def clear_call_explanation(self) -> None: 

268 self.__pending_call_explanation = None 

269 

270 @contextmanager 

271 def _log_phase_statistics( 

272 self, phase: Literal["reuse", "generate", "shrink"] 

273 ) -> Generator[None, None, None]: 

274 self.stats_per_test_case.clear() 

275 start_time = time.perf_counter() 

276 try: 

277 self._current_phase = phase 

278 yield 

279 finally: 

280 # We ignore the mypy type error here. Because `phase` is a string literal and "-phase" is a string literal 

281 # as well, the concatenation will always be valid key in the dictionary. 

282 self.statistics[phase + "-phase"] = { # type: ignore 

283 "duration-seconds": time.perf_counter() - start_time, 

284 "test-cases": list(self.stats_per_test_case), 

285 "distinct-failures": len(self.interesting_examples), 

286 "shrinks-successful": self.shrinks, 

287 } 

288 

289 @property 

290 def should_optimise(self) -> bool: 

291 return Phase.target in self.settings.phases 

292 

293 def __tree_is_exhausted(self) -> bool: 

294 return self.tree.is_exhausted and self.settings.backend == "hypothesis" 

295 

296 def __stoppable_test_function(self, data: ConjectureData) -> None: 

297 """Run ``self._test_function``, but convert a ``StopTest`` exception 

298 into a normal return and avoid raising Flaky for RecursionErrors. 

299 """ 

300 # We ensure that the test has this much stack space remaining, no 

301 # matter the size of the stack when called, to de-flake RecursionErrors 

302 # (#2494, #3671). Note, this covers the data generation part of the test; 

303 # the actual test execution is additionally protected at the call site 

304 # in hypothesis.core.execute_once. 

305 with ensure_free_stackframes(): 

306 try: 

307 self._test_function(data) 

308 except StopTest as e: 

309 if e.testcounter == data.testcounter: 

310 # This StopTest has successfully stopped its test, and can now 

311 # be discarded. 

312 pass 

313 else: 

314 # This StopTest was raised by a different ConjectureData. We 

315 # need to re-raise it so that it will eventually reach the 

316 # correct engine. 

317 raise 

318 

319 def _cache_key_ir( 

320 self, 

321 *, 

322 nodes: Optional[List[IRNode]] = None, 

323 data: Union[ConjectureData, ConjectureResult, None] = None, 

324 ) -> Tuple[Tuple[Any, ...], ...]: 

325 assert (nodes is not None) ^ (data is not None) 

326 extension = [] 

327 if data is not None: 

328 nodes = data.examples.ir_tree_nodes 

329 if data.invalid_at is not None: 

330 # if we're invalid then we should have at least one node left (the invalid one). 

331 assert isinstance(data, ConjectureData) 

332 assert data.ir_tree_nodes is not None 

333 assert data._node_index < len(data.ir_tree_nodes) 

334 extension = [data.ir_tree_nodes[data._node_index]] 

335 assert nodes is not None 

336 

337 # intentionally drop was_forced from equality here, because the was_forced 

338 # of node prefixes on ConjectureData has no impact on that data's result 

339 return tuple( 

340 ( 

341 node.ir_type, 

342 ir_value_key(node.ir_type, node.value), 

343 ir_kwargs_key(node.ir_type, node.kwargs), 

344 ) 

345 for node in nodes + extension 

346 ) 

347 

348 def _cache(self, data: ConjectureData) -> None: 

349 result = data.as_result() 

350 # when we shrink, we try out of bounds things, which can lead to the same 

351 # data.buffer having multiple outcomes. eg data.buffer=b'' is Status.OVERRUN 

352 # in normal circumstances, but a data with 

353 # ir_nodes=[integer -5 {min_value: 0, max_value: 10}] will also have 

354 # data.buffer=b'' but will be Status.INVALID instead. We do not want to 

355 # change the cached value to INVALID in this case. 

356 # 

357 # We handle this specially for the ir cache by keying off the misaligned node 

358 # as well, but we cannot do the same for buffers as we do not know ahead of 

359 # time what buffer a node maps to. I think it's largely fine that we don't 

360 # write to the buffer cache here as we move more things to the ir cache. 

361 if data.invalid_at is None: 

362 self.__data_cache[data.buffer] = result 

363 

364 # interesting buffer-based data can mislead the shrinker if we cache them. 

365 # 

366 # @given(st.integers()) 

367 # def f(n): 

368 # assert n < 100 

369 # 

370 # may generate two counterexamples, n=101 and n=m > 101, in that order, 

371 # where the buffer corresponding to n is large due to eg failed probes. 

372 # We shrink m and eventually try n=101, but it is cached to a large buffer 

373 # and so the best we can do is n=102, a non-ideal shrink. 

374 # 

375 # We can cache ir-based buffers fine, which always correspond to the 

376 # smallest buffer via forced=. The overhead here is small because almost 

377 # all interesting data are ir-based via the shrinker (and that overhead 

378 # will tend towards zero as we move generation to the ir). 

379 if data.ir_tree_nodes is not None or data.status < Status.INTERESTING: 

380 key = self._cache_key_ir(data=data) 

381 self.__data_cache_ir[key] = result 

382 

383 def cached_test_function_ir( 

384 self, nodes: List[IRNode], *, error_on_discard: bool = False 

385 ) -> Union[ConjectureResult, _Overrun]: 

386 key = self._cache_key_ir(nodes=nodes) 

387 try: 

388 return self.__data_cache_ir[key] 

389 except KeyError: 

390 pass 

391 

392 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver. 

393 # The reason is we don't expect simulate_test_function to explore new choices 

394 # and write back to the tree, so we don't want the overhead of the 

395 # TreeRecordingObserver tracking those calls. 

396 trial_observer: Optional[DataObserver] = DataObserver() 

397 if error_on_discard: 

398 

399 class DiscardObserver(DataObserver): 

400 @override 

401 def kill_branch(self) -> NoReturn: 

402 raise ContainsDiscard 

403 

404 trial_observer = DiscardObserver() 

405 

406 try: 

407 trial_data = self.new_conjecture_data_ir(nodes, observer=trial_observer) 

408 self.tree.simulate_test_function(trial_data) 

409 except PreviouslyUnseenBehaviour: 

410 pass 

411 else: 

412 trial_data.freeze() 

413 key = self._cache_key_ir(data=trial_data) 

414 try: 

415 return self.__data_cache_ir[key] 

416 except KeyError: 

417 pass 

418 

419 data = self.new_conjecture_data_ir(nodes) 

420 # note that calling test_function caches `data` for us, for both an ir 

421 # tree key and a buffer key. 

422 self.test_function(data) 

423 return data.as_result() 

424 

425 def test_function(self, data: ConjectureData) -> None: 

426 if self.__pending_call_explanation is not None: 

427 self.debug(self.__pending_call_explanation) 

428 self.__pending_call_explanation = None 

429 

430 self.call_count += 1 

431 

432 interrupted = False 

433 try: 

434 self.__stoppable_test_function(data) 

435 except KeyboardInterrupt: 

436 interrupted = True 

437 raise 

438 except BaseException: 

439 self.save_buffer(data.buffer) 

440 raise 

441 finally: 

442 # No branch, because if we're interrupted we always raise 

443 # the KeyboardInterrupt, never continue to the code below. 

444 if not interrupted: # pragma: no branch 

445 data.freeze() 

446 call_stats: CallStats = { 

447 "status": data.status.name.lower(), 

448 "runtime": data.finish_time - data.start_time, 

449 "drawtime": math.fsum(data.draw_times.values()), 

450 "gctime": data.gc_finish_time - data.gc_start_time, 

451 "events": sorted( 

452 k if v == "" else f"{k}: {v}" for k, v in data.events.items() 

453 ), 

454 } 

455 self.stats_per_test_case.append(call_stats) 

456 self._cache(data) 

457 if data.invalid_at is not None: # pragma: no branch # coverage bug? 

458 self.misaligned_count += 1 

459 

460 self.debug_data(data) 

461 

462 if ( 

463 data.target_observations 

464 and self.pareto_front is not None 

465 and self.pareto_front.add(data.as_result()) 

466 ): 

467 self.save_buffer(data.buffer, sub_key=b"pareto") 

468 

469 assert len(data.buffer) <= BUFFER_SIZE 

470 

471 if data.status >= Status.VALID: 

472 for k, v in data.target_observations.items(): 

473 self.best_observed_targets[k] = max(self.best_observed_targets[k], v) 

474 

475 if k not in self.best_examples_of_observed_targets: 

476 data_as_result = data.as_result() 

477 assert not isinstance(data_as_result, _Overrun) 

478 self.best_examples_of_observed_targets[k] = data_as_result 

479 continue 

480 

481 existing_example = self.best_examples_of_observed_targets[k] 

482 existing_score = existing_example.target_observations[k] 

483 

484 if v < existing_score: 

485 continue 

486 

487 if v > existing_score or sort_key(data.buffer) < sort_key( 

488 existing_example.buffer 

489 ): 

490 data_as_result = data.as_result() 

491 assert not isinstance(data_as_result, _Overrun) 

492 self.best_examples_of_observed_targets[k] = data_as_result 

493 

494 if data.status == Status.VALID: 

495 self.valid_examples += 1 

496 

497 if data.status == Status.INTERESTING: 

498 if self.settings.backend != "hypothesis": 

499 for node in data.examples.ir_tree_nodes: 

500 value = data.provider.post_test_case_hook(node.value) 

501 # require providers to return something valid here. 

502 assert ( 

503 value is not None 

504 ), "providers must return a non-null value from post_test_case_hook" 

505 node.value = value 

506 

507 # drive the ir tree through the test function to convert it 

508 # to a buffer 

509 data = ConjectureData.for_ir_tree(data.examples.ir_tree_nodes) 

510 self.__stoppable_test_function(data) 

511 self._cache(data) 

512 

513 key = data.interesting_origin 

514 changed = False 

515 try: 

516 existing = self.interesting_examples[key] # type: ignore 

517 except KeyError: 

518 changed = True 

519 self.last_bug_found_at = self.call_count 

520 if self.first_bug_found_at is None: 

521 self.first_bug_found_at = self.call_count 

522 else: 

523 if sort_key(data.buffer) < sort_key(existing.buffer): 

524 self.shrinks += 1 

525 self.downgrade_buffer(existing.buffer) 

526 self.__data_cache.unpin(existing.buffer) 

527 changed = True 

528 

529 if changed: 

530 self.save_buffer(data.buffer) 

531 self.interesting_examples[key] = data.as_result() # type: ignore 

532 self.__data_cache.pin(data.buffer) 

533 self.shrunk_examples.discard(key) 

534 

535 if self.shrinks >= MAX_SHRINKS: 

536 self.exit_with(ExitReason.max_shrinks) 

537 

538 if ( 

539 not self.ignore_limits 

540 and self.finish_shrinking_deadline is not None 

541 and self.finish_shrinking_deadline < time.perf_counter() 

542 ): 

543 # See https://github.com/HypothesisWorks/hypothesis/issues/2340 

544 report( 

545 "WARNING: Hypothesis has spent more than five minutes working to shrink" 

546 " a failing example, and stopped because it is making very slow" 

547 " progress. When you re-run your tests, shrinking will resume and may" 

548 " take this long before aborting again.\nPLEASE REPORT THIS if you can" 

549 " provide a reproducing example, so that we can improve shrinking" 

550 " performance for everyone." 

551 ) 

552 self.exit_with(ExitReason.very_slow_shrinking) 

553 

554 if not self.interesting_examples: 

555 # Note that this logic is reproduced to end the generation phase when 

556 # we have interesting examples. Update that too if you change this! 

557 # (The doubled implementation is because here we exit the engine entirely, 

558 # while in the other case below we just want to move on to shrinking.) 

559 if self.valid_examples >= self.settings.max_examples: 

560 self.exit_with(ExitReason.max_examples) 

561 if self.call_count >= max( 

562 self.settings.max_examples * 10, 

563 # We have a high-ish default max iterations, so that tests 

564 # don't become flaky when max_examples is too low. 

565 1000, 

566 ): 

567 self.exit_with(ExitReason.max_iterations) 

568 

569 if self.__tree_is_exhausted(): 

570 self.exit_with(ExitReason.finished) 

571 

572 self.record_for_health_check(data) 

573 

574 def on_pareto_evict(self, data: ConjectureData) -> None: 

575 self.settings.database.delete(self.pareto_key, data.buffer) 

576 

577 def generate_novel_prefix(self) -> bytes: 

578 """Uses the tree to proactively generate a starting sequence of bytes 

579 that we haven't explored yet for this test. 

580 

581 When this method is called, we assume that there must be at 

582 least one novel prefix left to find. If there were not, then the 

583 test run should have already stopped due to tree exhaustion. 

584 """ 

585 return self.tree.generate_novel_prefix(self.random) 

586 

587 def record_for_health_check(self, data: ConjectureData) -> None: 

588 # Once we've actually found a bug, there's no point in trying to run 

589 # health checks - they'll just mask the actually important information. 

590 if data.status == Status.INTERESTING: 

591 self.health_check_state = None 

592 

593 state = self.health_check_state 

594 

595 if state is None: 

596 return 

597 

598 for k, v in data.draw_times.items(): 

599 state.draw_times[k].append(v) 

600 

601 if data.status == Status.VALID: 

602 state.valid_examples += 1 

603 elif data.status == Status.INVALID: 

604 state.invalid_examples += 1 

605 else: 

606 assert data.status == Status.OVERRUN 

607 state.overrun_examples += 1 

608 

609 max_valid_draws = 10 

610 max_invalid_draws = 50 

611 max_overrun_draws = 20 

612 

613 assert state.valid_examples <= max_valid_draws 

614 

615 if state.valid_examples == max_valid_draws: 

616 self.health_check_state = None 

617 return 

618 

619 if state.overrun_examples == max_overrun_draws: 

620 fail_health_check( 

621 self.settings, 

622 "Examples routinely exceeded the max allowable size. " 

623 f"({state.overrun_examples} examples overran while generating " 

624 f"{state.valid_examples} valid ones). Generating examples this large " 

625 "will usually lead to bad results. You could try setting max_size " 

626 "parameters on your collections and turning max_leaves down on " 

627 "recursive() calls.", 

628 HealthCheck.data_too_large, 

629 ) 

630 if state.invalid_examples == max_invalid_draws: 

631 fail_health_check( 

632 self.settings, 

633 "It looks like your strategy is filtering out a lot of data. Health " 

634 f"check found {state.invalid_examples} filtered examples but only " 

635 f"{state.valid_examples} good ones. This will make your tests much " 

636 "slower, and also will probably distort the data generation quite a " 

637 "lot. You should adapt your strategy to filter less. This can also " 

638 "be caused by a low max_leaves parameter in recursive() calls", 

639 HealthCheck.filter_too_much, 

640 ) 

641 

642 draw_time = state.total_draw_time 

643 

644 # Allow at least the greater of one second or 5x the deadline. If deadline 

645 # is None, allow 30s - the user can disable the healthcheck too if desired. 

646 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6)) 

647 if draw_time > max(1.0, draw_time_limit.total_seconds()): 

648 fail_health_check( 

649 self.settings, 

650 "Data generation is extremely slow: Only produced " 

651 f"{state.valid_examples} valid examples in {draw_time:.2f} seconds " 

652 f"({state.invalid_examples} invalid ones and {state.overrun_examples} " 

653 "exceeded maximum size). Try decreasing size of the data you're " 

654 "generating (with e.g. max_size or max_leaves parameters)." 

655 + state.timing_report(), 

656 HealthCheck.too_slow, 

657 ) 

658 

659 def save_buffer( 

660 self, buffer: Union[bytes, bytearray], sub_key: Optional[bytes] = None 

661 ) -> None: 

662 if self.settings.database is not None: 

663 key = self.sub_key(sub_key) 

664 if key is None: 

665 return 

666 self.settings.database.save(key, bytes(buffer)) 

667 

668 def downgrade_buffer(self, buffer: Union[bytes, bytearray]) -> None: 

669 if self.settings.database is not None and self.database_key is not None: 

670 self.settings.database.move(self.database_key, self.secondary_key, buffer) 

671 

672 def sub_key(self, sub_key: Optional[bytes]) -> Optional[bytes]: 

673 if self.database_key is None: 

674 return None 

675 if sub_key is None: 

676 return self.database_key 

677 return b".".join((self.database_key, sub_key)) 

678 

679 @property 

680 def secondary_key(self) -> Optional[bytes]: 

681 return self.sub_key(b"secondary") 

682 

683 @property 

684 def pareto_key(self) -> Optional[bytes]: 

685 return self.sub_key(b"pareto") 

686 

687 def debug(self, message: str) -> None: 

688 if self.settings.verbosity >= Verbosity.debug: 

689 base_report(message) 

690 

691 @property 

692 def report_debug_info(self) -> bool: 

693 return self.settings.verbosity >= Verbosity.debug 

694 

695 def debug_data(self, data: Union[ConjectureData, ConjectureResult]) -> None: 

696 if not self.report_debug_info: 

697 return 

698 

699 stack: List[Ls] = [[]] 

700 

701 def go(ex: Example) -> None: 

702 if ex.length == 0: 

703 return 

704 if len(ex.children) == 0: 

705 stack[-1].append(int_from_bytes(data.buffer[ex.start : ex.end])) 

706 else: 

707 node: Ls = [] 

708 stack.append(node) 

709 

710 for v in ex.children: 

711 go(v) 

712 stack.pop() 

713 if len(node) == 1: 

714 stack[-1].extend(node) 

715 else: 

716 stack[-1].append(node) 

717 

718 go(data.examples[0]) 

719 assert len(stack) == 1 

720 

721 status = repr(data.status) 

722 

723 if data.status == Status.INTERESTING: 

724 status = f"{status} ({data.interesting_origin!r})" 

725 

726 self.debug(f"{data.index} bytes {stack[0]!r} -> {status}, {data.output}") 

727 

728 def run(self) -> None: 

729 with local_settings(self.settings): 

730 try: 

731 self._run() 

732 except RunIsComplete: 

733 pass 

734 for v in self.interesting_examples.values(): 

735 self.debug_data(v) 

736 self.debug( 

737 "Run complete after %d examples (%d valid) and %d shrinks" 

738 % (self.call_count, self.valid_examples, self.shrinks) 

739 ) 

740 

741 @property 

742 def database(self) -> Optional[ExampleDatabase]: 

743 if self.database_key is None: 

744 return None 

745 return self.settings.database 

746 

747 def has_existing_examples(self) -> bool: 

748 return self.database is not None and Phase.reuse in self.settings.phases 

749 

750 def reuse_existing_examples(self) -> None: 

751 """If appropriate (we have a database and have been told to use it), 

752 try to reload existing examples from the database. 

753 

754 If there are a lot we don't try all of them. We always try the 

755 smallest example in the database (which is guaranteed to be the 

756 last failure) and the largest (which is usually the seed example 

757 which the last failure came from but we don't enforce that). We 

758 then take a random sampling of the remainder and try those. Any 

759 examples that are no longer interesting are cleared out. 

760 """ 

761 if self.has_existing_examples(): 

762 self.debug("Reusing examples from database") 

763 # We have to do some careful juggling here. We have two database 

764 # corpora: The primary and secondary. The primary corpus is a 

765 # small set of minimized examples each of which has at one point 

766 # demonstrated a distinct bug. We want to retry all of these. 

767 

768 # We also have a secondary corpus of examples that have at some 

769 # point demonstrated interestingness (currently only ones that 

770 # were previously non-minimal examples of a bug, but this will 

771 # likely expand in future). These are a good source of potentially 

772 # interesting examples, but there are a lot of them, so we down 

773 # sample the secondary corpus to a more manageable size. 

774 

775 corpus = sorted( 

776 self.settings.database.fetch(self.database_key), key=sort_key 

777 ) 

778 factor = 0.1 if (Phase.generate in self.settings.phases) else 1 

779 desired_size = max(2, ceil(factor * self.settings.max_examples)) 

780 

781 if len(corpus) < desired_size: 

782 extra_corpus = list(self.settings.database.fetch(self.secondary_key)) 

783 

784 shortfall = desired_size - len(corpus) 

785 

786 if len(extra_corpus) <= shortfall: 

787 extra = extra_corpus 

788 else: 

789 extra = self.random.sample(extra_corpus, shortfall) 

790 extra.sort(key=sort_key) 

791 corpus.extend(extra) 

792 

793 for existing in corpus: 

794 data = self.cached_test_function(existing, extend=BUFFER_SIZE) 

795 if data.status != Status.INTERESTING: 

796 self.settings.database.delete(self.database_key, existing) 

797 self.settings.database.delete(self.secondary_key, existing) 

798 

799 # Because self.database is not None (because self.has_existing_examples()) 

800 # and self.database_key is not None (because we fetched using it above), 

801 # we can guarantee self.pareto_front is not None 

802 assert self.pareto_front is not None 

803 

804 # If we've not found any interesting examples so far we try some of 

805 # the pareto front from the last run. 

806 if len(corpus) < desired_size and not self.interesting_examples: 

807 desired_extra = desired_size - len(corpus) 

808 pareto_corpus = list(self.settings.database.fetch(self.pareto_key)) 

809 if len(pareto_corpus) > desired_extra: 

810 pareto_corpus = self.random.sample(pareto_corpus, desired_extra) 

811 pareto_corpus.sort(key=sort_key) 

812 

813 for existing in pareto_corpus: 

814 data = self.cached_test_function(existing, extend=BUFFER_SIZE) 

815 if data not in self.pareto_front: 

816 self.settings.database.delete(self.pareto_key, existing) 

817 if data.status == Status.INTERESTING: 

818 break 

819 

820 def exit_with(self, reason: ExitReason) -> None: 

821 if self.ignore_limits: 

822 return 

823 self.statistics["stopped-because"] = reason.describe(self.settings) 

824 if self.best_observed_targets: 

825 self.statistics["targets"] = dict(self.best_observed_targets) 

826 self.debug(f"exit_with({reason.name})") 

827 self.exit_reason = reason 

828 raise RunIsComplete 

829 

830 def should_generate_more(self) -> bool: 

831 # End the generation phase where we would have ended it if no bugs had 

832 # been found. This reproduces the exit logic in `self.test_function`, 

833 # but with the important distinction that this clause will move on to 

834 # the shrinking phase having found one or more bugs, while the other 

835 # will exit having found zero bugs. 

836 if self.valid_examples >= self.settings.max_examples or self.call_count >= max( 

837 self.settings.max_examples * 10, 1000 

838 ): # pragma: no cover 

839 return False 

840 

841 # If we haven't found a bug, keep looking - if we hit any limits on 

842 # the number of tests to run that will raise an exception and stop 

843 # the run. 

844 if not self.interesting_examples: 

845 return True 

846 # Users who disable shrinking probably want to exit as fast as possible. 

847 # If we've found a bug and won't report more than one, stop looking. 

848 elif ( 

849 Phase.shrink not in self.settings.phases 

850 or not self.settings.report_multiple_bugs 

851 ): 

852 return False 

853 assert isinstance(self.first_bug_found_at, int) 

854 assert isinstance(self.last_bug_found_at, int) 

855 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count 

856 # Otherwise, keep searching for between ten and 'a heuristic' calls. 

857 # We cap 'calls after first bug' so errors are reported reasonably 

858 # soon even for tests that are allowed to run for a very long time, 

859 # or sooner if the latest half of our test effort has been fruitless. 

860 return self.call_count < MIN_TEST_CALLS or self.call_count < min( 

861 self.first_bug_found_at + 1000, self.last_bug_found_at * 2 

862 ) 

863 

864 def generate_new_examples(self) -> None: 

865 if Phase.generate not in self.settings.phases: 

866 return 

867 if self.interesting_examples: 

868 # The example database has failing examples from a previous run, 

869 # so we'd rather report that they're still failing ASAP than take 

870 # the time to look for additional failures. 

871 return 

872 

873 self.debug("Generating new examples") 

874 

875 assert self.should_generate_more() 

876 zero_data = self.cached_test_function(bytes(BUFFER_SIZE)) 

877 if zero_data.status > Status.OVERRUN: 

878 assert isinstance(zero_data, ConjectureResult) 

879 self.__data_cache.pin(zero_data.buffer) 

880 

881 if zero_data.status == Status.OVERRUN or ( 

882 zero_data.status == Status.VALID 

883 and isinstance(zero_data, ConjectureResult) 

884 and len(zero_data.buffer) * 2 > BUFFER_SIZE 

885 ): 

886 fail_health_check( 

887 self.settings, 

888 "The smallest natural example for your test is extremely " 

889 "large. This makes it difficult for Hypothesis to generate " 

890 "good examples, especially when trying to reduce failing ones " 

891 "at the end. Consider reducing the size of your data if it is " 

892 "of a fixed size. You could also fix this by improving how " 

893 "your data shrinks (see https://hypothesis.readthedocs.io/en/" 

894 "latest/data.html#shrinking for details), or by introducing " 

895 "default values inside your strategy. e.g. could you replace " 

896 "some arguments with their defaults by using " 

897 "one_of(none(), some_complex_strategy)?", 

898 HealthCheck.large_base_example, 

899 ) 

900 

901 self.health_check_state = HealthCheckState() 

902 

903 # We attempt to use the size of the minimal generated test case starting 

904 # from a given novel prefix as a guideline to generate smaller test 

905 # cases for an initial period, by restriscting ourselves to test cases 

906 # that are not much larger than it. 

907 # 

908 # Calculating the actual minimal generated test case is hard, so we 

909 # take a best guess that zero extending a prefix produces the minimal 

910 # test case starting with that prefix (this is true for our built in 

911 # strategies). This is only a reasonable thing to do if the resulting 

912 # test case is valid. If we regularly run into situations where it is 

913 # not valid then this strategy is a waste of time, so we want to 

914 # abandon it early. In order to do this we track how many times in a 

915 # row it has failed to work, and abort small test case generation when 

916 # it has failed too many times in a row. 

917 consecutive_zero_extend_is_invalid = 0 

918 

919 # We control growth during initial example generation, for two 

920 # reasons: 

921 # 

922 # * It gives us an opportunity to find small examples early, which 

923 # gives us a fast path for easy to find bugs. 

924 # * It avoids low probability events where we might end up 

925 # generating very large examples during health checks, which 

926 # on slower machines can trigger HealthCheck.too_slow. 

927 # 

928 # The heuristic we use is that we attempt to estimate the smallest 

929 # extension of this prefix, and limit the size to no more than 

930 # an order of magnitude larger than that. If we fail to estimate 

931 # the size accurately, we skip over this prefix and try again. 

932 # 

933 # We need to tune the example size based on the initial prefix, 

934 # because any fixed size might be too small, and any size based 

935 # on the strategy in general can fall afoul of strategies that 

936 # have very different sizes for different prefixes. 

937 small_example_cap = clamp(10, self.settings.max_examples // 10, 50) 

938 

939 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1) 

940 ran_optimisations = False 

941 

942 while self.should_generate_more(): 

943 # Unfortunately generate_novel_prefix still operates in terms of 

944 # a buffer and uses HypothesisProvider as its backing provider, 

945 # not whatever is specified by the backend. We can improve this 

946 # once more things are on the ir. 

947 if self.settings.backend != "hypothesis": 

948 data = self.new_conjecture_data(prefix=b"", max_length=BUFFER_SIZE) 

949 self.test_function(data) 

950 continue 

951 

952 self._current_phase = "generate" 

953 prefix = self.generate_novel_prefix() 

954 assert len(prefix) <= BUFFER_SIZE 

955 if ( 

956 self.valid_examples <= small_example_cap 

957 and self.call_count <= 5 * small_example_cap 

958 and not self.interesting_examples 

959 and consecutive_zero_extend_is_invalid < 5 

960 ): 

961 minimal_example = self.cached_test_function( 

962 prefix + bytes(BUFFER_SIZE - len(prefix)) 

963 ) 

964 

965 if minimal_example.status < Status.VALID: 

966 consecutive_zero_extend_is_invalid += 1 

967 continue 

968 # Because the Status code is greater than Status.VALID, it cannot be 

969 # Status.OVERRUN, which guarantees that the minimal_example is a 

970 # ConjectureResult object. 

971 assert isinstance(minimal_example, ConjectureResult) 

972 

973 consecutive_zero_extend_is_invalid = 0 

974 

975 minimal_extension = len(minimal_example.buffer) - len(prefix) 

976 

977 max_length = min(len(prefix) + minimal_extension * 10, BUFFER_SIZE) 

978 

979 # We could end up in a situation where even though the prefix was 

980 # novel when we generated it, because we've now tried zero extending 

981 # it not all possible continuations of it will be novel. In order to 

982 # avoid making redundant test calls, we rerun it in simulation mode 

983 # first. If this has a predictable result, then we don't bother 

984 # running the test function for real here. If however we encounter 

985 # some novel behaviour, we try again with the real test function, 

986 # starting from the new novel prefix that has discovered. 

987 

988 trial_data = self.new_conjecture_data( 

989 prefix=prefix, max_length=max_length 

990 ) 

991 try: 

992 self.tree.simulate_test_function(trial_data) 

993 continue 

994 except PreviouslyUnseenBehaviour: 

995 pass 

996 

997 # If the simulation entered part of the tree that has been killed, 

998 # we don't want to run this. 

999 assert isinstance(trial_data.observer, TreeRecordingObserver) 

1000 if trial_data.observer.killed: 

1001 continue 

1002 

1003 # We might have hit the cap on number of examples we should 

1004 # run when calculating the minimal example. 

1005 if not self.should_generate_more(): 

1006 break 

1007 

1008 prefix = trial_data.buffer 

1009 else: 

1010 max_length = BUFFER_SIZE 

1011 

1012 data = self.new_conjecture_data(prefix=prefix, max_length=max_length) 

1013 

1014 self.test_function(data) 

1015 

1016 if ( 

1017 data.status == Status.OVERRUN 

1018 and max_length < BUFFER_SIZE 

1019 and "invalid because" not in data.events 

1020 ): 

1021 data.events["invalid because"] = ( 

1022 "reduced max size for early examples (avoids flaky health checks)" 

1023 ) 

1024 

1025 self.generate_mutations_from(data) 

1026 

1027 # Although the optimisations are logically a distinct phase, we 

1028 # actually normally run them as part of example generation. The 

1029 # reason for this is that we cannot guarantee that optimisation 

1030 # actually exhausts our budget: It might finish running and we 

1031 # discover that actually we still could run a bunch more test cases 

1032 # if we want. 

1033 if ( 

1034 self.valid_examples >= max(small_example_cap, optimise_at) 

1035 and not ran_optimisations 

1036 ): 

1037 ran_optimisations = True 

1038 self._current_phase = "target" 

1039 self.optimise_targets() 

1040 

1041 def generate_mutations_from( 

1042 self, data: Union[ConjectureData, ConjectureResult] 

1043 ) -> None: 

1044 # A thing that is often useful but rarely happens by accident is 

1045 # to generate the same value at multiple different points in the 

1046 # test case. 

1047 # 

1048 # Rather than make this the responsibility of individual strategies 

1049 # we implement a small mutator that just takes parts of the test 

1050 # case with the same label and tries replacing one of them with a 

1051 # copy of the other and tries running it. If we've made a good 

1052 # guess about what to put where, this will run a similar generated 

1053 # test case with more duplication. 

1054 if ( 

1055 # An OVERRUN doesn't have enough information about the test 

1056 # case to mutate, so we just skip those. 

1057 data.status >= Status.INVALID 

1058 # This has a tendency to trigger some weird edge cases during 

1059 # generation so we don't let it run until we're done with the 

1060 # health checks. 

1061 and self.health_check_state is None 

1062 ): 

1063 initial_calls = self.call_count 

1064 failed_mutations = 0 

1065 

1066 while ( 

1067 self.should_generate_more() 

1068 # We implement fairly conservative checks for how long we 

1069 # we should run mutation for, as it's generally not obvious 

1070 # how helpful it is for any given test case. 

1071 and self.call_count <= initial_calls + 5 

1072 and failed_mutations <= 5 

1073 ): 

1074 groups = data.examples.mutator_groups 

1075 if not groups: 

1076 break 

1077 

1078 group = self.random.choice(groups) 

1079 

1080 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2) 

1081 if (start1 <= start2 <= end2 <= end1) or ( 

1082 start2 <= start1 <= end1 <= end2 

1083 ): 

1084 # one example entirely contains the other. give up. 

1085 # TODO use more intelligent mutation for containment, like 

1086 # replacing child with parent or vice versa. Would allow for 

1087 # recursive / subtree mutation 

1088 failed_mutations += 1 

1089 continue 

1090 

1091 if start1 > start2: 

1092 (start1, end1), (start2, end2) = (start2, end2), (start1, end1) 

1093 assert end1 <= start2 

1094 

1095 nodes = data.examples.ir_tree_nodes 

1096 (start, end) = self.random.choice([(start1, end1), (start2, end2)]) 

1097 replacement = nodes[start:end] 

1098 

1099 try: 

1100 # We attempt to replace both the examples with 

1101 # whichever choice we made. Note that this might end 

1102 # up messing up and getting the example boundaries 

1103 # wrong - labels matching are only a best guess as to 

1104 # whether the two are equivalent - but it doesn't 

1105 # really matter. It may not achieve the desired result, 

1106 # but it's still a perfectly acceptable choice sequence 

1107 # to try. 

1108 new_data = self.cached_test_function_ir( 

1109 nodes[:start1] 

1110 + replacement 

1111 + nodes[end1:start2] 

1112 + replacement 

1113 + nodes[end2:], 

1114 # We set error_on_discard so that we don't end up 

1115 # entering parts of the tree we consider redundant 

1116 # and not worth exploring. 

1117 error_on_discard=True, 

1118 ) 

1119 except ContainsDiscard: 

1120 failed_mutations += 1 

1121 continue 

1122 

1123 if new_data is Overrun: 

1124 failed_mutations += 1 # pragma: no cover # annoying case 

1125 else: 

1126 assert isinstance(new_data, ConjectureResult) 

1127 if ( 

1128 new_data.status >= data.status 

1129 and data.buffer != new_data.buffer 

1130 and all( 

1131 k in new_data.target_observations 

1132 and new_data.target_observations[k] >= v 

1133 for k, v in data.target_observations.items() 

1134 ) 

1135 ): 

1136 data = new_data 

1137 failed_mutations = 0 

1138 else: 

1139 failed_mutations += 1 

1140 

1141 def optimise_targets(self) -> None: 

1142 """If any target observations have been made, attempt to optimise them 

1143 all.""" 

1144 if not self.should_optimise: 

1145 return 

1146 from hypothesis.internal.conjecture.optimiser import Optimiser 

1147 

1148 # We want to avoid running the optimiser for too long in case we hit 

1149 # an unbounded target score. We start this off fairly conservatively 

1150 # in case interesting examples are easy to find and then ramp it up 

1151 # on an exponential schedule so we don't hamper the optimiser too much 

1152 # if it needs a long time to find good enough improvements. 

1153 max_improvements = 10 

1154 while True: 

1155 prev_calls = self.call_count 

1156 

1157 any_improvements = False 

1158 

1159 for target, data in list(self.best_examples_of_observed_targets.items()): 

1160 optimiser = Optimiser( 

1161 self, data, target, max_improvements=max_improvements 

1162 ) 

1163 optimiser.run() 

1164 if optimiser.improvements > 0: 

1165 any_improvements = True 

1166 

1167 if self.interesting_examples: 

1168 break 

1169 

1170 max_improvements *= 2 

1171 

1172 if any_improvements: 

1173 continue 

1174 

1175 if self.best_observed_targets: 

1176 self.pareto_optimise() 

1177 

1178 if prev_calls == self.call_count: 

1179 break 

1180 

1181 def pareto_optimise(self) -> None: 

1182 if self.pareto_front is not None: 

1183 ParetoOptimiser(self).run() 

1184 

1185 def _run(self) -> None: 

1186 # have to use the primitive provider to interpret database bits... 

1187 self._switch_to_hypothesis_provider = True 

1188 with self._log_phase_statistics("reuse"): 

1189 self.reuse_existing_examples() 

1190 # ...but we should use the supplied provider when generating... 

1191 self._switch_to_hypothesis_provider = False 

1192 with self._log_phase_statistics("generate"): 

1193 self.generate_new_examples() 

1194 # We normally run the targeting phase mixed in with the generate phase, 

1195 # but if we've been asked to run it but not generation then we have to 

1196 # run it explicitly on its own here. 

1197 if Phase.generate not in self.settings.phases: 

1198 self._current_phase = "target" 

1199 self.optimise_targets() 

1200 # ...and back to the primitive provider when shrinking. 

1201 self._switch_to_hypothesis_provider = True 

1202 with self._log_phase_statistics("shrink"): 

1203 self.shrink_interesting_examples() 

1204 self.exit_with(ExitReason.finished) 

1205 

1206 def new_conjecture_data_ir( 

1207 self, 

1208 ir_tree_prefix: List[IRNode], 

1209 *, 

1210 observer: Optional[DataObserver] = None, 

1211 max_length: Optional[int] = None, 

1212 ) -> ConjectureData: 

1213 provider = ( 

1214 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider 

1215 ) 

1216 observer = observer or self.tree.new_observer() 

1217 if self.settings.backend != "hypothesis": 

1218 observer = DataObserver() 

1219 

1220 return ConjectureData.for_ir_tree( 

1221 ir_tree_prefix, observer=observer, provider=provider, max_length=max_length 

1222 ) 

1223 

1224 def new_conjecture_data( 

1225 self, 

1226 prefix: Union[bytes, bytearray], 

1227 max_length: int = BUFFER_SIZE, 

1228 observer: Optional[DataObserver] = None, 

1229 ) -> ConjectureData: 

1230 provider = ( 

1231 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider 

1232 ) 

1233 observer = observer or self.tree.new_observer() 

1234 if self.settings.backend != "hypothesis": 

1235 observer = DataObserver() 

1236 

1237 return ConjectureData( 

1238 prefix=prefix, 

1239 max_length=max_length, 

1240 random=self.random, 

1241 observer=observer, 

1242 provider=provider, 

1243 ) 

1244 

1245 def new_conjecture_data_for_buffer( 

1246 self, buffer: Union[bytes, bytearray] 

1247 ) -> ConjectureData: 

1248 return self.new_conjecture_data(buffer, max_length=len(buffer)) 

1249 

1250 def shrink_interesting_examples(self) -> None: 

1251 """If we've found interesting examples, try to replace each of them 

1252 with a minimal interesting example with the same interesting_origin. 

1253 

1254 We may find one or more examples with a new interesting_origin 

1255 during the shrink process. If so we shrink these too. 

1256 """ 

1257 if Phase.shrink not in self.settings.phases or not self.interesting_examples: 

1258 return 

1259 

1260 self.debug("Shrinking interesting examples") 

1261 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS 

1262 

1263 for prev_data in sorted( 

1264 self.interesting_examples.values(), key=lambda d: sort_key(d.buffer) 

1265 ): 

1266 assert prev_data.status == Status.INTERESTING 

1267 data = self.new_conjecture_data_ir(prev_data.examples.ir_tree_nodes) 

1268 self.test_function(data) 

1269 if data.status != Status.INTERESTING: 

1270 self.exit_with(ExitReason.flaky) 

1271 

1272 self.clear_secondary_key() 

1273 

1274 while len(self.shrunk_examples) < len(self.interesting_examples): 

1275 target, example = min( 

1276 ( 

1277 (k, v) 

1278 for k, v in self.interesting_examples.items() 

1279 if k not in self.shrunk_examples 

1280 ), 

1281 key=lambda kv: (sort_key(kv[1].buffer), sort_key(repr(kv[0]))), 

1282 ) 

1283 self.debug(f"Shrinking {target!r}") 

1284 

1285 if not self.settings.report_multiple_bugs: 

1286 # If multi-bug reporting is disabled, we shrink our currently-minimal 

1287 # failure, allowing 'slips' to any bug with a smaller minimal example. 

1288 self.shrink(example, lambda d: d.status == Status.INTERESTING) 

1289 return 

1290 

1291 def predicate(d: ConjectureData) -> bool: 

1292 if d.status < Status.INTERESTING: 

1293 return False 

1294 return d.interesting_origin == target 

1295 

1296 self.shrink(example, predicate) 

1297 

1298 self.shrunk_examples.add(target) 

1299 

1300 def clear_secondary_key(self) -> None: 

1301 if self.has_existing_examples(): 

1302 # If we have any smaller examples in the secondary corpus, now is 

1303 # a good time to try them to see if they work as shrinks. They 

1304 # probably won't, but it's worth a shot and gives us a good 

1305 # opportunity to clear out the database. 

1306 

1307 # It's not worth trying the primary corpus because we already 

1308 # tried all of those in the initial phase. 

1309 corpus = sorted( 

1310 self.settings.database.fetch(self.secondary_key), key=sort_key 

1311 ) 

1312 for c in corpus: 

1313 primary = {v.buffer for v in self.interesting_examples.values()} 

1314 

1315 cap = max(map(sort_key, primary)) 

1316 

1317 if sort_key(c) > cap: 

1318 break 

1319 else: 

1320 self.cached_test_function(c) 

1321 # We unconditionally remove c from the secondary key as it 

1322 # is either now primary or worse than our primary example 

1323 # of this reason for interestingness. 

1324 self.settings.database.delete(self.secondary_key, c) 

1325 

1326 def shrink( 

1327 self, 

1328 example: Union[ConjectureData, ConjectureResult], 

1329 predicate: Optional[Callable[[ConjectureData], bool]] = None, 

1330 allow_transition: Optional[ 

1331 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1332 ] = None, 

1333 ) -> Union[ConjectureData, ConjectureResult]: 

1334 s = self.new_shrinker(example, predicate, allow_transition) 

1335 s.shrink() 

1336 return s.shrink_target 

1337 

1338 def new_shrinker( 

1339 self, 

1340 example: Union[ConjectureData, ConjectureResult], 

1341 predicate: Optional[Callable[[ConjectureData], bool]] = None, 

1342 allow_transition: Optional[ 

1343 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1344 ] = None, 

1345 ) -> Shrinker: 

1346 return Shrinker( 

1347 self, 

1348 example, 

1349 predicate, 

1350 allow_transition=allow_transition, 

1351 explain=Phase.explain in self.settings.phases, 

1352 in_target_phase=self._current_phase == "target", 

1353 ) 

1354 

1355 def cached_test_function( 

1356 self, 

1357 buffer: Union[bytes, bytearray], 

1358 *, 

1359 extend: int = 0, 

1360 ) -> Union[ConjectureResult, _Overrun]: 

1361 """Checks the tree to see if we've tested this buffer, and returns the 

1362 previous result if we have. 

1363 

1364 Otherwise we call through to ``test_function``, and return a 

1365 fresh result. 

1366 

1367 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard`` 

1368 in preference to running the actual test function. This is to allow us 

1369 to skip test cases we expect to be redundant in some cases. Note that 

1370 it may be the case that we don't raise ``ContainsDiscard`` even if the 

1371 result has discards if we cannot determine from previous runs whether 

1372 it will have a discard. 

1373 """ 

1374 buffer = bytes(buffer)[:BUFFER_SIZE] 

1375 

1376 max_length = min(BUFFER_SIZE, len(buffer) + extend) 

1377 

1378 @overload 

1379 def check_result(result: _Overrun) -> _Overrun: ... 

1380 @overload 

1381 def check_result(result: ConjectureResult) -> ConjectureResult: ... 

1382 def check_result( 

1383 result: Union[_Overrun, ConjectureResult], 

1384 ) -> Union[_Overrun, ConjectureResult]: 

1385 assert result is Overrun or ( 

1386 isinstance(result, ConjectureResult) and result.status != Status.OVERRUN 

1387 ) 

1388 return result 

1389 

1390 try: 

1391 cached = check_result(self.__data_cache[buffer]) 

1392 if cached.status > Status.OVERRUN or extend == 0: 

1393 return cached 

1394 except KeyError: 

1395 pass 

1396 

1397 observer = DataObserver() 

1398 dummy_data = self.new_conjecture_data( 

1399 prefix=buffer, max_length=max_length, observer=observer 

1400 ) 

1401 

1402 if self.settings.backend == "hypothesis": 

1403 try: 

1404 self.tree.simulate_test_function(dummy_data) 

1405 except PreviouslyUnseenBehaviour: 

1406 pass 

1407 else: 

1408 if dummy_data.status > Status.OVERRUN: 

1409 dummy_data.freeze() 

1410 try: 

1411 return self.__data_cache[dummy_data.buffer] 

1412 except KeyError: 

1413 pass 

1414 else: 

1415 self.__data_cache[buffer] = Overrun 

1416 return Overrun 

1417 

1418 # We didn't find a match in the tree, so we need to run the test 

1419 # function normally. Note that test_function will automatically 

1420 # add this to the tree so we don't need to update the cache. 

1421 

1422 result = None 

1423 

1424 data = self.new_conjecture_data( 

1425 prefix=max((buffer, dummy_data.buffer), key=len), max_length=max_length 

1426 ) 

1427 self.test_function(data) 

1428 result = check_result(data.as_result()) 

1429 if extend == 0 or ( 

1430 result is not Overrun 

1431 and not isinstance(result, _Overrun) 

1432 and len(result.buffer) <= len(buffer) 

1433 ): 

1434 self.__data_cache[buffer] = result 

1435 return result 

1436 

1437 def passing_buffers(self, prefix: bytes = b"") -> FrozenSet[bytes]: 

1438 """Return a collection of bytestrings which cause the test to pass. 

1439 

1440 Optionally restrict this by a certain prefix, which is useful for explain mode. 

1441 """ 

1442 return frozenset( 

1443 buf 

1444 for buf in self.__data_cache 

1445 if buf.startswith(prefix) and self.__data_cache[buf].status == Status.VALID 

1446 ) 

1447 

1448 

1449class ContainsDiscard(Exception): 

1450 pass