Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/engine.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

686 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import importlib 

12import inspect 

13import math 

14import textwrap 

15import time 

16from collections import defaultdict 

17from collections.abc import Generator, Sequence 

18from contextlib import AbstractContextManager, contextmanager, nullcontext, suppress 

19from dataclasses import dataclass, field 

20from datetime import timedelta 

21from enum import Enum 

22from random import Random, getrandbits 

23from typing import Callable, Final, List, Literal, NoReturn, Optional, Union, cast 

24 

25from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings 

26from hypothesis._settings import local_settings, note_deprecation 

27from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes 

28from hypothesis.errors import ( 

29 BackendCannotProceed, 

30 FlakyReplay, 

31 HypothesisException, 

32 InvalidArgument, 

33 StopTest, 

34) 

35from hypothesis.internal.cache import LRUReusedCache 

36from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override 

37from hypothesis.internal.conjecture.choice import ( 

38 ChoiceConstraintsT, 

39 ChoiceKeyT, 

40 ChoiceNode, 

41 ChoiceT, 

42 ChoiceTemplate, 

43 choices_key, 

44) 

45from hypothesis.internal.conjecture.data import ( 

46 ConjectureData, 

47 ConjectureResult, 

48 DataObserver, 

49 Overrun, 

50 Status, 

51 _Overrun, 

52) 

53from hypothesis.internal.conjecture.datatree import ( 

54 DataTree, 

55 PreviouslyUnseenBehaviour, 

56 TreeRecordingObserver, 

57) 

58from hypothesis.internal.conjecture.junkdrawer import ( 

59 ensure_free_stackframes, 

60 startswith, 

61) 

62from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser 

63from hypothesis.internal.conjecture.providers import ( 

64 AVAILABLE_PROVIDERS, 

65 HypothesisProvider, 

66 PrimitiveProvider, 

67) 

68from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key 

69from hypothesis.internal.escalation import InterestingOrigin 

70from hypothesis.internal.healthcheck import fail_health_check 

71from hypothesis.internal.observability import Observation, with_observation_callback 

72from hypothesis.reporting import base_report, report 

73 

74#: The maximum number of times the shrinker will reduce the complexity of a failing 

75#: input before giving up. This avoids falling down a trap of exponential (or worse) 

76#: complexity, where the shrinker appears to be making progress but will take a 

77#: substantially long time to finish completely. 

78MAX_SHRINKS: Final[int] = 500 

79 

80# If the shrinking phase takes more than five minutes, abort it early and print 

81# a warning. Many CI systems will kill a build after around ten minutes with 

82# no output, and appearing to hang isn't great for interactive use either - 

83# showing partially-shrunk examples is better than quitting with no examples! 

84# (but make it monkeypatchable, for the rare users who need to keep on shrinking) 

85 

86#: The maximum total time in seconds that the shrinker will try to shrink a failure 

87#: for before giving up. This is across all shrinks for the same failure, so even 

88#: if the shrinker successfully reduces the complexity of a single failure several 

89#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken. 

90MAX_SHRINKING_SECONDS: Final[int] = 300 

91 

92#: The maximum amount of entropy a single test case can use before giving up 

93#: while making random choices during input generation. 

94#: 

95#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you 

96#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly 

97#: increase the amount of entropy a test case can use during generation. 

98BUFFER_SIZE: Final[int] = 8 * 1024 

99CACHE_SIZE: Final[int] = 10000 

100MIN_TEST_CALLS: Final[int] = 10 

101 

102 

103def shortlex(s): 

104 return (len(s), s) 

105 

106 

107@dataclass 

108class HealthCheckState: 

109 valid_examples: int = field(default=0) 

110 invalid_examples: int = field(default=0) 

111 overrun_examples: int = field(default=0) 

112 draw_times: "defaultdict[str, List[float]]" = field( 

113 default_factory=lambda: defaultdict(list) 

114 ) 

115 

116 @property 

117 def total_draw_time(self) -> float: 

118 return math.fsum(sum(self.draw_times.values(), start=[])) 

119 

120 def timing_report(self) -> str: 

121 """Return a terminal report describing what was slow.""" 

122 if not self.draw_times: 

123 return "" 

124 width = max( 

125 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times 

126 ) 

127 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"] 

128 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1])) 

129 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch 

130 # If we have very many unique keys, which can happen due to interactive 

131 # draws with computed labels, we'll skip uninformative rows. 

132 if ( 

133 5 <= i < (len(self.draw_times) - 2) 

134 and math.fsum(times) * 20 < self.total_draw_time 

135 ): 

136 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)") 

137 break 

138 # Compute the row to report, omitting times <1ms to focus on slow draws 

139 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4] 

140 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",") 

141 arg = argname.removeprefix("generate:").removesuffix(": ") 

142 out.append( 

143 f" {arg:^{width}} | {len(times):>4} | " 

144 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}" 

145 ) 

146 return "\n".join(out) 

147 

148 

149class ExitReason(Enum): 

150 max_examples = "settings.max_examples={s.max_examples}" 

151 max_iterations = ( 

152 "settings.max_examples={s.max_examples}, " 

153 "but < 10% of examples satisfied assumptions" 

154 ) 

155 max_shrinks = f"shrunk example {MAX_SHRINKS} times" 

156 finished = "nothing left to do" 

157 flaky = "test was flaky" 

158 very_slow_shrinking = "shrinking was very slow" 

159 

160 def describe(self, settings: Settings) -> str: 

161 return self.value.format(s=settings) 

162 

163 

164class RunIsComplete(Exception): 

165 pass 

166 

167 

168def _get_provider(backend: str) -> Union[type, PrimitiveProvider]: 

169 mname, cname = AVAILABLE_PROVIDERS[backend].rsplit(".", 1) 

170 provider_cls = getattr(importlib.import_module(mname), cname) 

171 if provider_cls.lifetime == "test_function": 

172 return provider_cls(None) 

173 elif provider_cls.lifetime == "test_case": 

174 return provider_cls 

175 else: 

176 raise InvalidArgument( 

177 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. " 

178 "Expected one of 'test_function', 'test_case'." 

179 ) 

180 

181 

182class CallStats(TypedDict): 

183 status: str 

184 runtime: float 

185 drawtime: float 

186 gctime: float 

187 events: list[str] 

188 

189 

190PhaseStatistics = TypedDict( 

191 "PhaseStatistics", 

192 { 

193 "duration-seconds": float, 

194 "test-cases": list[CallStats], 

195 "distinct-failures": int, 

196 "shrinks-successful": int, 

197 }, 

198) 

199StatisticsDict = TypedDict( 

200 "StatisticsDict", 

201 { 

202 "generate-phase": NotRequired[PhaseStatistics], 

203 "reuse-phase": NotRequired[PhaseStatistics], 

204 "shrink-phase": NotRequired[PhaseStatistics], 

205 "stopped-because": NotRequired[str], 

206 "targets": NotRequired[dict[str, float]], 

207 "nodeid": NotRequired[str], 

208 }, 

209) 

210 

211 

212def choice_count(choices: Sequence[Union[ChoiceT, ChoiceTemplate]]) -> Optional[int]: 

213 count = 0 

214 for choice in choices: 

215 if isinstance(choice, ChoiceTemplate): 

216 if choice.count is None: 

217 return None 

218 count += choice.count 

219 else: 

220 count += 1 

221 return count 

222 

223 

224class DiscardObserver(DataObserver): 

225 @override 

226 def kill_branch(self) -> NoReturn: 

227 raise ContainsDiscard 

228 

229 

230def realize_choices(data: ConjectureData, *, for_failure: bool) -> None: 

231 # backwards-compatibility with backends without for_failure, can remove 

232 # in a few months 

233 kwargs = {} 

234 if for_failure: 

235 if "for_failure" in inspect.signature(data.provider.realize).parameters: 

236 kwargs["for_failure"] = True 

237 else: 

238 note_deprecation( 

239 f"{type(data.provider).__qualname__}.realize does not have the " 

240 "for_failure parameter. This will be an error in future versions " 

241 "of Hypothesis. (If you installed this backend from a separate " 

242 "package, upgrading that package may help).", 

243 has_codemod=False, 

244 since="2025-05-07", 

245 ) 

246 

247 for node in data.nodes: 

248 value = data.provider.realize(node.value, **kwargs) 

249 expected_type = { 

250 "string": str, 

251 "float": float, 

252 "integer": int, 

253 "boolean": bool, 

254 "bytes": bytes, 

255 }[node.type] 

256 if type(value) is not expected_type: 

257 raise HypothesisException( 

258 f"expected {expected_type} from " 

259 f"{data.provider.realize.__qualname__}, got {type(value)}" 

260 ) 

261 

262 constraints = cast( 

263 ChoiceConstraintsT, 

264 { 

265 k: data.provider.realize(v, **kwargs) 

266 for k, v in node.constraints.items() 

267 }, 

268 ) 

269 node.value = value 

270 node.constraints = constraints 

271 

272 

273class ConjectureRunner: 

274 def __init__( 

275 self, 

276 test_function: Callable[[ConjectureData], None], 

277 *, 

278 settings: Optional[Settings] = None, 

279 random: Optional[Random] = None, 

280 database_key: Optional[bytes] = None, 

281 ignore_limits: bool = False, 

282 ) -> None: 

283 self._test_function: Callable[[ConjectureData], None] = test_function 

284 self.settings: Settings = settings or Settings() 

285 self.shrinks: int = 0 

286 self.finish_shrinking_deadline: Optional[float] = None 

287 self.call_count: int = 0 

288 self.misaligned_count: int = 0 

289 self.valid_examples: int = 0 

290 self.invalid_examples: int = 0 

291 self.overrun_examples: int = 0 

292 self.random: Random = random or Random(getrandbits(128)) 

293 self.database_key: Optional[bytes] = database_key 

294 self.ignore_limits: bool = ignore_limits 

295 

296 # Global dict of per-phase statistics, and a list of per-call stats 

297 # which transfer to the global dict at the end of each phase. 

298 self._current_phase: str = "(not a phase)" 

299 self.statistics: StatisticsDict = {} 

300 self.stats_per_test_case: list[CallStats] = [] 

301 

302 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests. 

303 self.interesting_examples: dict[ 

304 Optional[InterestingOrigin], ConjectureResult 

305 ] = {} 

306 # We use call_count because there may be few possible valid_examples. 

307 self.first_bug_found_at: Optional[int] = None 

308 self.last_bug_found_at: Optional[int] = None 

309 

310 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests. 

311 self.shrunk_examples: set[Optional[InterestingOrigin]] = set() 

312 

313 self.health_check_state: Optional[HealthCheckState] = None 

314 

315 self.tree: DataTree = DataTree() 

316 

317 self.provider: Union[type, PrimitiveProvider] = _get_provider( 

318 self.settings.backend 

319 ) 

320 

321 self.best_observed_targets: defaultdict[str, float] = defaultdict( 

322 lambda: NO_SCORE 

323 ) 

324 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {} 

325 

326 # We keep the pareto front in the example database if we have one. This 

327 # is only marginally useful at present, but speeds up local development 

328 # because it means that large targets will be quickly surfaced in your 

329 # testing. 

330 self.pareto_front: Optional[ParetoFront] = None 

331 if self.database_key is not None and self.settings.database is not None: 

332 self.pareto_front = ParetoFront(self.random) 

333 self.pareto_front.on_evict(self.on_pareto_evict) 

334 

335 # We want to be able to get the ConjectureData object that results 

336 # from running a buffer without recalculating, especially during 

337 # shrinking where we need to know about the structure of the 

338 # executed test case. 

339 self.__data_cache = LRUReusedCache[ 

340 tuple[ChoiceKeyT, ...], Union[ConjectureResult, _Overrun] 

341 ](CACHE_SIZE) 

342 

343 self.reused_previously_shrunk_test_case: bool = False 

344 

345 self.__pending_call_explanation: Optional[str] = None 

346 self._backend_found_failure: bool = False 

347 self._switch_to_hypothesis_provider: bool = False 

348 

349 self.__failed_realize_count: int = 0 

350 # note unsound verification by alt backends 

351 self._verified_by: Optional[str] = None 

352 

353 @property 

354 def using_hypothesis_backend(self) -> bool: 

355 return ( 

356 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider 

357 ) 

358 

359 def explain_next_call_as(self, explanation: str) -> None: 

360 self.__pending_call_explanation = explanation 

361 

362 def clear_call_explanation(self) -> None: 

363 self.__pending_call_explanation = None 

364 

365 @contextmanager 

366 def _log_phase_statistics( 

367 self, phase: Literal["reuse", "generate", "shrink"] 

368 ) -> Generator[None, None, None]: 

369 self.stats_per_test_case.clear() 

370 start_time = time.perf_counter() 

371 try: 

372 self._current_phase = phase 

373 yield 

374 finally: 

375 # We ignore the mypy type error here. Because `phase` is a string literal and "-phase" is a string literal 

376 # as well, the concatenation will always be valid key in the dictionary. 

377 self.statistics[phase + "-phase"] = { # type: ignore 

378 "duration-seconds": time.perf_counter() - start_time, 

379 "test-cases": list(self.stats_per_test_case), 

380 "distinct-failures": len(self.interesting_examples), 

381 "shrinks-successful": self.shrinks, 

382 } 

383 

384 @property 

385 def should_optimise(self) -> bool: 

386 return Phase.target in self.settings.phases 

387 

388 def __tree_is_exhausted(self) -> bool: 

389 return self.tree.is_exhausted and self.using_hypothesis_backend 

390 

391 def __stoppable_test_function(self, data: ConjectureData) -> None: 

392 """Run ``self._test_function``, but convert a ``StopTest`` exception 

393 into a normal return and avoid raising anything flaky for RecursionErrors. 

394 """ 

395 # We ensure that the test has this much stack space remaining, no 

396 # matter the size of the stack when called, to de-flake RecursionErrors 

397 # (#2494, #3671). Note, this covers the data generation part of the test; 

398 # the actual test execution is additionally protected at the call site 

399 # in hypothesis.core.execute_once. 

400 with ensure_free_stackframes(): 

401 try: 

402 self._test_function(data) 

403 except StopTest as e: 

404 if e.testcounter == data.testcounter: 

405 # This StopTest has successfully stopped its test, and can now 

406 # be discarded. 

407 pass 

408 else: 

409 # This StopTest was raised by a different ConjectureData. We 

410 # need to re-raise it so that it will eventually reach the 

411 # correct engine. 

412 raise 

413 

414 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]: 

415 return choices_key(choices) 

416 

417 def _cache(self, data: ConjectureData) -> None: 

418 result = data.as_result() 

419 key = self._cache_key(data.choices) 

420 self.__data_cache[key] = result 

421 

422 def cached_test_function( 

423 self, 

424 choices: Sequence[Union[ChoiceT, ChoiceTemplate]], 

425 *, 

426 error_on_discard: bool = False, 

427 extend: Union[int, Literal["full"]] = 0, 

428 ) -> Union[ConjectureResult, _Overrun]: 

429 """ 

430 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard`` 

431 in preference to running the actual test function. This is to allow us 

432 to skip test cases we expect to be redundant in some cases. Note that 

433 it may be the case that we don't raise ``ContainsDiscard`` even if the 

434 result has discards if we cannot determine from previous runs whether 

435 it will have a discard. 

436 """ 

437 # node templates represent a not-yet-filled hole and therefore cannot 

438 # be cached or retrieved from the cache. 

439 if not any(isinstance(choice, ChoiceTemplate) for choice in choices): 

440 # this type cast is validated by the isinstance check above (ie, there 

441 # are no ChoiceTemplate elements). 

442 choices = cast(Sequence[ChoiceT], choices) 

443 key = self._cache_key(choices) 

444 try: 

445 cached = self.__data_cache[key] 

446 # if we have a cached overrun for this key, but we're allowing extensions 

447 # of the nodes, it could in fact run to a valid data if we try. 

448 if extend == 0 or cached.status is not Status.OVERRUN: 

449 return cached 

450 except KeyError: 

451 pass 

452 

453 if extend == "full": 

454 max_length = None 

455 elif (count := choice_count(choices)) is None: 

456 max_length = None 

457 else: 

458 max_length = count + extend 

459 

460 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver. 

461 # The reason is we don't expect simulate_test_function to explore new choices 

462 # and write back to the tree, so we don't want the overhead of the 

463 # TreeRecordingObserver tracking those calls. 

464 trial_observer: Optional[DataObserver] = DataObserver() 

465 if error_on_discard: 

466 trial_observer = DiscardObserver() 

467 

468 try: 

469 trial_data = self.new_conjecture_data( 

470 choices, observer=trial_observer, max_choices=max_length 

471 ) 

472 self.tree.simulate_test_function(trial_data) 

473 except PreviouslyUnseenBehaviour: 

474 pass 

475 else: 

476 trial_data.freeze() 

477 key = self._cache_key(trial_data.choices) 

478 if trial_data.status > Status.OVERRUN: 

479 try: 

480 return self.__data_cache[key] 

481 except KeyError: 

482 pass 

483 else: 

484 # if we simulated to an overrun, then we our result is certainly 

485 # an overrun; no need to consult the cache. (and we store this result 

486 # for simulation-less lookup later). 

487 self.__data_cache[key] = Overrun 

488 return Overrun 

489 try: 

490 return self.__data_cache[key] 

491 except KeyError: 

492 pass 

493 

494 data = self.new_conjecture_data(choices, max_choices=max_length) 

495 # note that calling test_function caches `data` for us, for both an ir 

496 # tree key and a buffer key. 

497 self.test_function(data) 

498 return data.as_result() 

499 

500 def test_function(self, data: ConjectureData) -> None: 

501 if self.__pending_call_explanation is not None: 

502 self.debug(self.__pending_call_explanation) 

503 self.__pending_call_explanation = None 

504 

505 self.call_count += 1 

506 interrupted = False 

507 

508 try: 

509 self.__stoppable_test_function(data) 

510 except KeyboardInterrupt: 

511 interrupted = True 

512 raise 

513 except BackendCannotProceed as exc: 

514 if exc.scope in ("verified", "exhausted"): 

515 self._switch_to_hypothesis_provider = True 

516 if exc.scope == "verified": 

517 self._verified_by = self.settings.backend 

518 elif exc.scope == "discard_test_case": 

519 self.__failed_realize_count += 1 

520 if ( 

521 self.__failed_realize_count > 10 

522 and (self.__failed_realize_count / self.call_count) > 0.2 

523 ): 

524 self._switch_to_hypothesis_provider = True 

525 

526 # treat all BackendCannotProceed exceptions as invalid. This isn't 

527 # great; "verified" should really be counted as self.valid_examples += 1. 

528 # But we check self.valid_examples == 0 to determine whether to raise 

529 # Unsatisfiable, and that would throw this check off. 

530 self.invalid_examples += 1 

531 

532 # skip the post-test-case tracking; we're pretending this never happened 

533 interrupted = True 

534 data.cannot_proceed_scope = exc.scope 

535 data.freeze() 

536 return 

537 except BaseException: 

538 data.freeze() 

539 if self.settings.backend != "hypothesis": 

540 realize_choices(data, for_failure=True) 

541 self.save_choices(data.choices) 

542 raise 

543 finally: 

544 # No branch, because if we're interrupted we always raise 

545 # the KeyboardInterrupt, never continue to the code below. 

546 if not interrupted: # pragma: no branch 

547 assert data.cannot_proceed_scope is None 

548 data.freeze() 

549 call_stats: CallStats = { 

550 "status": data.status.name.lower(), 

551 "runtime": data.finish_time - data.start_time, 

552 "drawtime": math.fsum(data.draw_times.values()), 

553 "gctime": data.gc_finish_time - data.gc_start_time, 

554 "events": sorted( 

555 k if v == "" else f"{k}: {v}" for k, v in data.events.items() 

556 ), 

557 } 

558 self.stats_per_test_case.append(call_stats) 

559 if self.settings.backend != "hypothesis": 

560 realize_choices(data, for_failure=data.status is Status.INTERESTING) 

561 

562 self._cache(data) 

563 if data.misaligned_at is not None: # pragma: no branch # coverage bug? 

564 self.misaligned_count += 1 

565 

566 self.debug_data(data) 

567 

568 if ( 

569 data.target_observations 

570 and self.pareto_front is not None 

571 and self.pareto_front.add(data.as_result()) 

572 ): 

573 self.save_choices(data.choices, sub_key=b"pareto") 

574 

575 if data.status >= Status.VALID: 

576 for k, v in data.target_observations.items(): 

577 self.best_observed_targets[k] = max(self.best_observed_targets[k], v) 

578 

579 if k not in self.best_examples_of_observed_targets: 

580 data_as_result = data.as_result() 

581 assert not isinstance(data_as_result, _Overrun) 

582 self.best_examples_of_observed_targets[k] = data_as_result 

583 continue 

584 

585 existing_example = self.best_examples_of_observed_targets[k] 

586 existing_score = existing_example.target_observations[k] 

587 

588 if v < existing_score: 

589 continue 

590 

591 if v > existing_score or sort_key(data.nodes) < sort_key( 

592 existing_example.nodes 

593 ): 

594 data_as_result = data.as_result() 

595 assert not isinstance(data_as_result, _Overrun) 

596 self.best_examples_of_observed_targets[k] = data_as_result 

597 

598 if data.status is Status.VALID: 

599 self.valid_examples += 1 

600 if data.status is Status.INVALID: 

601 self.invalid_examples += 1 

602 if data.status is Status.OVERRUN: 

603 self.overrun_examples += 1 

604 

605 if data.status == Status.INTERESTING: 

606 if not self.using_hypothesis_backend: 

607 # replay this failure on the hypothesis backend to ensure it still 

608 # finds a failure. otherwise, it is flaky. 

609 initial_origin = data.interesting_origin 

610 initial_traceback = data.expected_traceback 

611 data = ConjectureData.for_choices(data.choices) 

612 self.__stoppable_test_function(data) 

613 data.freeze() 

614 # TODO: Convert to FlakyFailure on the way out. Should same-origin 

615 # also be checked? 

616 if data.status != Status.INTERESTING: 

617 desc_new_status = { 

618 data.status.VALID: "passed", 

619 data.status.INVALID: "failed filters", 

620 data.status.OVERRUN: "overran", 

621 }[data.status] 

622 wrapped_tb = ( 

623 "" 

624 if initial_traceback is None 

625 else textwrap.indent(initial_traceback, " | ") 

626 ) 

627 raise FlakyReplay( 

628 f"Inconsistent results from replaying a failing test case!\n" 

629 f"{wrapped_tb}on backend={self.settings.backend!r} but " 

630 f"{desc_new_status} under backend='hypothesis'", 

631 interesting_origins=[initial_origin], 

632 ) 

633 

634 self._cache(data) 

635 

636 key = data.interesting_origin 

637 changed = False 

638 try: 

639 existing = self.interesting_examples[key] 

640 except KeyError: 

641 changed = True 

642 self.last_bug_found_at = self.call_count 

643 if self.first_bug_found_at is None: 

644 self.first_bug_found_at = self.call_count 

645 else: 

646 if sort_key(data.nodes) < sort_key(existing.nodes): 

647 self.shrinks += 1 

648 self.downgrade_choices(existing.choices) 

649 self.__data_cache.unpin(self._cache_key(existing.choices)) 

650 changed = True 

651 

652 if changed: 

653 self.save_choices(data.choices) 

654 self.interesting_examples[key] = data.as_result() # type: ignore 

655 if not self.using_hypothesis_backend: 

656 self._backend_found_failure = True 

657 self.__data_cache.pin(self._cache_key(data.choices), data.as_result()) 

658 self.shrunk_examples.discard(key) 

659 

660 if self.shrinks >= MAX_SHRINKS: 

661 self.exit_with(ExitReason.max_shrinks) 

662 

663 if ( 

664 not self.ignore_limits 

665 and self.finish_shrinking_deadline is not None 

666 and self.finish_shrinking_deadline < time.perf_counter() 

667 ): 

668 # See https://github.com/HypothesisWorks/hypothesis/issues/2340 

669 report( 

670 "WARNING: Hypothesis has spent more than five minutes working to shrink" 

671 " a failing example, and stopped because it is making very slow" 

672 " progress. When you re-run your tests, shrinking will resume and may" 

673 " take this long before aborting again.\nPLEASE REPORT THIS if you can" 

674 " provide a reproducing example, so that we can improve shrinking" 

675 " performance for everyone." 

676 ) 

677 self.exit_with(ExitReason.very_slow_shrinking) 

678 

679 if not self.interesting_examples: 

680 # Note that this logic is reproduced to end the generation phase when 

681 # we have interesting examples. Update that too if you change this! 

682 # (The doubled implementation is because here we exit the engine entirely, 

683 # while in the other case below we just want to move on to shrinking.) 

684 if self.valid_examples >= self.settings.max_examples: 

685 self.exit_with(ExitReason.max_examples) 

686 if self.call_count >= max( 

687 self.settings.max_examples * 10, 

688 # We have a high-ish default max iterations, so that tests 

689 # don't become flaky when max_examples is too low. 

690 1000, 

691 ): 

692 self.exit_with(ExitReason.max_iterations) 

693 

694 if self.__tree_is_exhausted(): 

695 self.exit_with(ExitReason.finished) 

696 

697 self.record_for_health_check(data) 

698 

699 def on_pareto_evict(self, data: ConjectureResult) -> None: 

700 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices)) 

701 

702 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]: 

703 """Uses the tree to proactively generate a starting choice sequence 

704 that we haven't explored yet for this test. 

705 

706 When this method is called, we assume that there must be at 

707 least one novel prefix left to find. If there were not, then the 

708 test run should have already stopped due to tree exhaustion. 

709 """ 

710 return self.tree.generate_novel_prefix(self.random) 

711 

712 def record_for_health_check(self, data: ConjectureData) -> None: 

713 # Once we've actually found a bug, there's no point in trying to run 

714 # health checks - they'll just mask the actually important information. 

715 if data.status == Status.INTERESTING: 

716 self.health_check_state = None 

717 

718 state = self.health_check_state 

719 

720 if state is None: 

721 return 

722 

723 for k, v in data.draw_times.items(): 

724 state.draw_times[k].append(v) 

725 

726 if data.status == Status.VALID: 

727 state.valid_examples += 1 

728 elif data.status == Status.INVALID: 

729 state.invalid_examples += 1 

730 else: 

731 assert data.status == Status.OVERRUN 

732 state.overrun_examples += 1 

733 

734 max_valid_draws = 10 

735 max_invalid_draws = 50 

736 max_overrun_draws = 20 

737 

738 assert state.valid_examples <= max_valid_draws 

739 

740 if state.valid_examples == max_valid_draws: 

741 self.health_check_state = None 

742 return 

743 

744 if state.overrun_examples == max_overrun_draws: 

745 fail_health_check( 

746 self.settings, 

747 "Examples routinely exceeded the max allowable size. " 

748 f"({state.overrun_examples} examples overran while generating " 

749 f"{state.valid_examples} valid ones). Generating examples this large " 

750 "will usually lead to bad results. You could try setting max_size " 

751 "parameters on your collections and turning max_leaves down on " 

752 "recursive() calls.", 

753 HealthCheck.data_too_large, 

754 ) 

755 if state.invalid_examples == max_invalid_draws: 

756 fail_health_check( 

757 self.settings, 

758 "It looks like your strategy is filtering out a lot of data. Health " 

759 f"check found {state.invalid_examples} filtered examples but only " 

760 f"{state.valid_examples} good ones. This will make your tests much " 

761 "slower, and also will probably distort the data generation quite a " 

762 "lot. You should adapt your strategy to filter less. This can also " 

763 "be caused by a low max_leaves parameter in recursive() calls", 

764 HealthCheck.filter_too_much, 

765 ) 

766 

767 draw_time = state.total_draw_time 

768 

769 # Allow at least the greater of one second or 5x the deadline. If deadline 

770 # is None, allow 30s - the user can disable the healthcheck too if desired. 

771 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6)) 

772 if draw_time > max(1.0, draw_time_limit.total_seconds()): 

773 fail_health_check( 

774 self.settings, 

775 "Data generation is extremely slow: Only produced " 

776 f"{state.valid_examples} valid examples in {draw_time:.2f} seconds " 

777 f"({state.invalid_examples} invalid ones and {state.overrun_examples} " 

778 "exceeded maximum size). Try decreasing size of the data you're " 

779 "generating (with e.g. max_size or max_leaves parameters)." 

780 + state.timing_report(), 

781 HealthCheck.too_slow, 

782 ) 

783 

784 def save_choices( 

785 self, choices: Sequence[ChoiceT], sub_key: Optional[bytes] = None 

786 ) -> None: 

787 if self.settings.database is not None: 

788 key = self.sub_key(sub_key) 

789 if key is None: 

790 return 

791 self.settings.database.save(key, choices_to_bytes(choices)) 

792 

793 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None: 

794 buffer = choices_to_bytes(choices) 

795 if self.settings.database is not None and self.database_key is not None: 

796 self.settings.database.move(self.database_key, self.secondary_key, buffer) 

797 

798 def sub_key(self, sub_key: Optional[bytes]) -> Optional[bytes]: 

799 if self.database_key is None: 

800 return None 

801 if sub_key is None: 

802 return self.database_key 

803 return b".".join((self.database_key, sub_key)) 

804 

805 @property 

806 def secondary_key(self) -> Optional[bytes]: 

807 return self.sub_key(b"secondary") 

808 

809 @property 

810 def pareto_key(self) -> Optional[bytes]: 

811 return self.sub_key(b"pareto") 

812 

813 def debug(self, message: str) -> None: 

814 if self.settings.verbosity >= Verbosity.debug: 

815 base_report(message) 

816 

817 @property 

818 def report_debug_info(self) -> bool: 

819 return self.settings.verbosity >= Verbosity.debug 

820 

821 def debug_data(self, data: Union[ConjectureData, ConjectureResult]) -> None: 

822 if not self.report_debug_info: 

823 return 

824 

825 status = repr(data.status) 

826 if data.status == Status.INTERESTING: 

827 status = f"{status} ({data.interesting_origin!r})" 

828 

829 self.debug( 

830 f"{len(data.choices)} choices {data.choices} -> {status}" 

831 f"{', ' + data.output if data.output else ''}" 

832 ) 

833 

834 def observe_for_provider(self) -> AbstractContextManager: 

835 def on_observation(observation: Observation) -> None: 

836 assert observation.type == "test_case" 

837 # because lifetime == "test_function" 

838 assert isinstance(self.provider, PrimitiveProvider) 

839 # only fire if we actually used that provider to generate this observation 

840 if not self._switch_to_hypothesis_provider: 

841 self.provider.on_observation(observation) 

842 

843 if ( 

844 self.settings.backend != "hypothesis" 

845 # only for lifetime = "test_function" providers (guaranteed 

846 # by this isinstance check) 

847 and isinstance(self.provider, PrimitiveProvider) 

848 # and the provider opted-in to observations 

849 and self.provider.add_observability_callback 

850 ): 

851 return with_observation_callback(on_observation) 

852 return nullcontext() 

853 

854 def run(self) -> None: 

855 with local_settings(self.settings): 

856 # NOTE: For compatibility with Python 3.9's LL(1) 

857 # parser, this is written as a nested with-statement, 

858 # instead of a compound one. 

859 with self.observe_for_provider(): 

860 try: 

861 self._run() 

862 except RunIsComplete: 

863 pass 

864 for v in self.interesting_examples.values(): 

865 self.debug_data(v) 

866 self.debug( 

867 "Run complete after %d examples (%d valid) and %d shrinks" 

868 % (self.call_count, self.valid_examples, self.shrinks) 

869 ) 

870 

871 @property 

872 def database(self) -> Optional[ExampleDatabase]: 

873 if self.database_key is None: 

874 return None 

875 return self.settings.database 

876 

877 def has_existing_examples(self) -> bool: 

878 return self.database is not None and Phase.reuse in self.settings.phases 

879 

880 def reuse_existing_examples(self) -> None: 

881 """If appropriate (we have a database and have been told to use it), 

882 try to reload existing examples from the database. 

883 

884 If there are a lot we don't try all of them. We always try the 

885 smallest example in the database (which is guaranteed to be the 

886 last failure) and the largest (which is usually the seed example 

887 which the last failure came from but we don't enforce that). We 

888 then take a random sampling of the remainder and try those. Any 

889 examples that are no longer interesting are cleared out. 

890 """ 

891 if self.has_existing_examples(): 

892 self.debug("Reusing examples from database") 

893 # We have to do some careful juggling here. We have two database 

894 # corpora: The primary and secondary. The primary corpus is a 

895 # small set of minimized examples each of which has at one point 

896 # demonstrated a distinct bug. We want to retry all of these. 

897 

898 # We also have a secondary corpus of examples that have at some 

899 # point demonstrated interestingness (currently only ones that 

900 # were previously non-minimal examples of a bug, but this will 

901 # likely expand in future). These are a good source of potentially 

902 # interesting examples, but there are a lot of them, so we down 

903 # sample the secondary corpus to a more manageable size. 

904 

905 corpus = sorted( 

906 self.settings.database.fetch(self.database_key), key=shortlex 

907 ) 

908 factor = 0.1 if (Phase.generate in self.settings.phases) else 1 

909 desired_size = max(2, ceil(factor * self.settings.max_examples)) 

910 primary_corpus_size = len(corpus) 

911 

912 if len(corpus) < desired_size: 

913 extra_corpus = list(self.settings.database.fetch(self.secondary_key)) 

914 

915 shortfall = desired_size - len(corpus) 

916 

917 if len(extra_corpus) <= shortfall: 

918 extra = extra_corpus 

919 else: 

920 extra = self.random.sample(extra_corpus, shortfall) 

921 extra.sort(key=shortlex) 

922 corpus.extend(extra) 

923 

924 # We want a fast path where every primary entry in the database was 

925 # interesting. 

926 found_interesting_in_primary = False 

927 all_interesting_in_primary_were_exact = True 

928 

929 for i, existing in enumerate(corpus): 

930 if i >= primary_corpus_size and found_interesting_in_primary: 

931 break 

932 choices = choices_from_bytes(existing) 

933 if choices is None: 

934 # clear out any keys which fail deserialization 

935 self.settings.database.delete(self.database_key, existing) 

936 continue 

937 data = self.cached_test_function(choices, extend="full") 

938 if data.status != Status.INTERESTING: 

939 self.settings.database.delete(self.database_key, existing) 

940 self.settings.database.delete(self.secondary_key, existing) 

941 else: 

942 if i < primary_corpus_size: 

943 found_interesting_in_primary = True 

944 assert not isinstance(data, _Overrun) 

945 if choices_key(choices) != choices_key(data.choices): 

946 all_interesting_in_primary_were_exact = False 

947 if not self.settings.report_multiple_bugs: 

948 break 

949 if found_interesting_in_primary: 

950 if all_interesting_in_primary_were_exact: 

951 self.reused_previously_shrunk_test_case = True 

952 

953 # Because self.database is not None (because self.has_existing_examples()) 

954 # and self.database_key is not None (because we fetched using it above), 

955 # we can guarantee self.pareto_front is not None 

956 assert self.pareto_front is not None 

957 

958 # If we've not found any interesting examples so far we try some of 

959 # the pareto front from the last run. 

960 if len(corpus) < desired_size and not self.interesting_examples: 

961 desired_extra = desired_size - len(corpus) 

962 pareto_corpus = list(self.settings.database.fetch(self.pareto_key)) 

963 if len(pareto_corpus) > desired_extra: 

964 pareto_corpus = self.random.sample(pareto_corpus, desired_extra) 

965 pareto_corpus.sort(key=shortlex) 

966 

967 for existing in pareto_corpus: 

968 choices = choices_from_bytes(existing) 

969 if choices is None: 

970 self.settings.database.delete(self.pareto_key, existing) 

971 continue 

972 data = self.cached_test_function(choices, extend="full") 

973 if data not in self.pareto_front: 

974 self.settings.database.delete(self.pareto_key, existing) 

975 if data.status == Status.INTERESTING: 

976 break 

977 

978 def exit_with(self, reason: ExitReason) -> None: 

979 if self.ignore_limits: 

980 return 

981 self.statistics["stopped-because"] = reason.describe(self.settings) 

982 if self.best_observed_targets: 

983 self.statistics["targets"] = dict(self.best_observed_targets) 

984 self.debug(f"exit_with({reason.name})") 

985 self.exit_reason = reason 

986 raise RunIsComplete 

987 

988 def should_generate_more(self) -> bool: 

989 # End the generation phase where we would have ended it if no bugs had 

990 # been found. This reproduces the exit logic in `self.test_function`, 

991 # but with the important distinction that this clause will move on to 

992 # the shrinking phase having found one or more bugs, while the other 

993 # will exit having found zero bugs. 

994 if self.valid_examples >= self.settings.max_examples or self.call_count >= max( 

995 self.settings.max_examples * 10, 1000 

996 ): # pragma: no cover 

997 return False 

998 

999 # If we haven't found a bug, keep looking - if we hit any limits on 

1000 # the number of tests to run that will raise an exception and stop 

1001 # the run. 

1002 if not self.interesting_examples: 

1003 return True 

1004 # Users who disable shrinking probably want to exit as fast as possible. 

1005 # If we've found a bug and won't report more than one, stop looking. 

1006 elif ( 

1007 Phase.shrink not in self.settings.phases 

1008 or not self.settings.report_multiple_bugs 

1009 ): 

1010 return False 

1011 assert isinstance(self.first_bug_found_at, int) 

1012 assert isinstance(self.last_bug_found_at, int) 

1013 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count 

1014 # Otherwise, keep searching for between ten and 'a heuristic' calls. 

1015 # We cap 'calls after first bug' so errors are reported reasonably 

1016 # soon even for tests that are allowed to run for a very long time, 

1017 # or sooner if the latest half of our test effort has been fruitless. 

1018 return self.call_count < MIN_TEST_CALLS or self.call_count < min( 

1019 self.first_bug_found_at + 1000, self.last_bug_found_at * 2 

1020 ) 

1021 

1022 def generate_new_examples(self) -> None: 

1023 if Phase.generate not in self.settings.phases: 

1024 return 

1025 if self.interesting_examples: 

1026 # The example database has failing examples from a previous run, 

1027 # so we'd rather report that they're still failing ASAP than take 

1028 # the time to look for additional failures. 

1029 return 

1030 

1031 self.debug("Generating new examples") 

1032 

1033 assert self.should_generate_more() 

1034 self._switch_to_hypothesis_provider = True 

1035 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),)) 

1036 if zero_data.status > Status.OVERRUN: 

1037 assert isinstance(zero_data, ConjectureResult) 

1038 # if the crosshair backend cannot proceed, it does not (and cannot) 

1039 # realize the symbolic values, with the intent that Hypothesis will 

1040 # throw away this test case. We usually do, but if it's the zero data 

1041 # then we try to pin it here, which requires realizing the symbolics. 

1042 # 

1043 # We don't (yet) rely on the zero data being pinned, and so 

1044 # it's simply a very slight performance loss to simply not pin it 

1045 # if doing so would error. 

1046 if zero_data.cannot_proceed_scope is None: # pragma: no branch 

1047 self.__data_cache.pin( 

1048 self._cache_key(zero_data.choices), zero_data.as_result() 

1049 ) # Pin forever 

1050 

1051 if zero_data.status == Status.OVERRUN or ( 

1052 zero_data.status == Status.VALID 

1053 and isinstance(zero_data, ConjectureResult) 

1054 and zero_data.length * 2 > BUFFER_SIZE 

1055 ): 

1056 fail_health_check( 

1057 self.settings, 

1058 "The smallest natural example for your test is extremely " 

1059 "large. This makes it difficult for Hypothesis to generate " 

1060 "good examples, especially when trying to reduce failing ones " 

1061 "at the end. Consider reducing the size of your data if it is " 

1062 "of a fixed size. You could also fix this by improving how " 

1063 "your data shrinks (see https://hypothesis.readthedocs.io/en/" 

1064 "latest/data.html#shrinking for details), or by introducing " 

1065 "default values inside your strategy. e.g. could you replace " 

1066 "some arguments with their defaults by using " 

1067 "one_of(none(), some_complex_strategy)?", 

1068 HealthCheck.large_base_example, 

1069 ) 

1070 

1071 self.health_check_state = HealthCheckState() 

1072 

1073 # We attempt to use the size of the minimal generated test case starting 

1074 # from a given novel prefix as a guideline to generate smaller test 

1075 # cases for an initial period, by restriscting ourselves to test cases 

1076 # that are not much larger than it. 

1077 # 

1078 # Calculating the actual minimal generated test case is hard, so we 

1079 # take a best guess that zero extending a prefix produces the minimal 

1080 # test case starting with that prefix (this is true for our built in 

1081 # strategies). This is only a reasonable thing to do if the resulting 

1082 # test case is valid. If we regularly run into situations where it is 

1083 # not valid then this strategy is a waste of time, so we want to 

1084 # abandon it early. In order to do this we track how many times in a 

1085 # row it has failed to work, and abort small test case generation when 

1086 # it has failed too many times in a row. 

1087 consecutive_zero_extend_is_invalid = 0 

1088 

1089 # We control growth during initial example generation, for two 

1090 # reasons: 

1091 # 

1092 # * It gives us an opportunity to find small examples early, which 

1093 # gives us a fast path for easy to find bugs. 

1094 # * It avoids low probability events where we might end up 

1095 # generating very large examples during health checks, which 

1096 # on slower machines can trigger HealthCheck.too_slow. 

1097 # 

1098 # The heuristic we use is that we attempt to estimate the smallest 

1099 # extension of this prefix, and limit the size to no more than 

1100 # an order of magnitude larger than that. If we fail to estimate 

1101 # the size accurately, we skip over this prefix and try again. 

1102 # 

1103 # We need to tune the example size based on the initial prefix, 

1104 # because any fixed size might be too small, and any size based 

1105 # on the strategy in general can fall afoul of strategies that 

1106 # have very different sizes for different prefixes. 

1107 # 

1108 # We previously set a minimum value of 10 on small_example_cap, with the 

1109 # reasoning of avoiding flaky health checks. However, some users set a 

1110 # low max_examples for performance. A hard lower bound in this case biases 

1111 # the distribution towards small (and less powerful) examples. Flaky 

1112 # and loud health checks are better than silent performance degradation. 

1113 small_example_cap = min(self.settings.max_examples // 10, 50) 

1114 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10) 

1115 ran_optimisations = False 

1116 self._switch_to_hypothesis_provider = False 

1117 

1118 while self.should_generate_more(): 

1119 # we don't yet integrate DataTree with backends. Instead of generating 

1120 # a novel prefix, ask the backend for an input. 

1121 if not self.using_hypothesis_backend: 

1122 data = self.new_conjecture_data([]) 

1123 with suppress(BackendCannotProceed): 

1124 self.test_function(data) 

1125 continue 

1126 

1127 self._current_phase = "generate" 

1128 prefix = self.generate_novel_prefix() 

1129 if ( 

1130 self.valid_examples <= small_example_cap 

1131 and self.call_count <= 5 * small_example_cap 

1132 and not self.interesting_examples 

1133 and consecutive_zero_extend_is_invalid < 5 

1134 ): 

1135 minimal_example = self.cached_test_function( 

1136 prefix + (ChoiceTemplate("simplest", count=None),) 

1137 ) 

1138 

1139 if minimal_example.status < Status.VALID: 

1140 consecutive_zero_extend_is_invalid += 1 

1141 continue 

1142 # Because the Status code is greater than Status.VALID, it cannot be 

1143 # Status.OVERRUN, which guarantees that the minimal_example is a 

1144 # ConjectureResult object. 

1145 assert isinstance(minimal_example, ConjectureResult) 

1146 consecutive_zero_extend_is_invalid = 0 

1147 minimal_extension = len(minimal_example.choices) - len(prefix) 

1148 max_length = len(prefix) + minimal_extension * 5 

1149 

1150 # We could end up in a situation where even though the prefix was 

1151 # novel when we generated it, because we've now tried zero extending 

1152 # it not all possible continuations of it will be novel. In order to 

1153 # avoid making redundant test calls, we rerun it in simulation mode 

1154 # first. If this has a predictable result, then we don't bother 

1155 # running the test function for real here. If however we encounter 

1156 # some novel behaviour, we try again with the real test function, 

1157 # starting from the new novel prefix that has discovered. 

1158 trial_data = self.new_conjecture_data(prefix, max_choices=max_length) 

1159 try: 

1160 self.tree.simulate_test_function(trial_data) 

1161 continue 

1162 except PreviouslyUnseenBehaviour: 

1163 pass 

1164 

1165 # If the simulation entered part of the tree that has been killed, 

1166 # we don't want to run this. 

1167 assert isinstance(trial_data.observer, TreeRecordingObserver) 

1168 if trial_data.observer.killed: 

1169 continue 

1170 

1171 # We might have hit the cap on number of examples we should 

1172 # run when calculating the minimal example. 

1173 if not self.should_generate_more(): 

1174 break 

1175 

1176 prefix = trial_data.choices 

1177 else: 

1178 max_length = None 

1179 

1180 data = self.new_conjecture_data(prefix, max_choices=max_length) 

1181 self.test_function(data) 

1182 

1183 if ( 

1184 data.status is Status.OVERRUN 

1185 and max_length is not None 

1186 and "invalid because" not in data.events 

1187 ): 

1188 data.events["invalid because"] = ( 

1189 "reduced max size for early examples (avoids flaky health checks)" 

1190 ) 

1191 

1192 self.generate_mutations_from(data) 

1193 

1194 # Although the optimisations are logically a distinct phase, we 

1195 # actually normally run them as part of example generation. The 

1196 # reason for this is that we cannot guarantee that optimisation 

1197 # actually exhausts our budget: It might finish running and we 

1198 # discover that actually we still could run a bunch more test cases 

1199 # if we want. 

1200 if ( 

1201 self.valid_examples >= max(small_example_cap, optimise_at) 

1202 and not ran_optimisations 

1203 ): 

1204 ran_optimisations = True 

1205 self._current_phase = "target" 

1206 self.optimise_targets() 

1207 

1208 def generate_mutations_from( 

1209 self, data: Union[ConjectureData, ConjectureResult] 

1210 ) -> None: 

1211 # A thing that is often useful but rarely happens by accident is 

1212 # to generate the same value at multiple different points in the 

1213 # test case. 

1214 # 

1215 # Rather than make this the responsibility of individual strategies 

1216 # we implement a small mutator that just takes parts of the test 

1217 # case with the same label and tries replacing one of them with a 

1218 # copy of the other and tries running it. If we've made a good 

1219 # guess about what to put where, this will run a similar generated 

1220 # test case with more duplication. 

1221 if ( 

1222 # An OVERRUN doesn't have enough information about the test 

1223 # case to mutate, so we just skip those. 

1224 data.status >= Status.INVALID 

1225 # This has a tendency to trigger some weird edge cases during 

1226 # generation so we don't let it run until we're done with the 

1227 # health checks. 

1228 and self.health_check_state is None 

1229 ): 

1230 initial_calls = self.call_count 

1231 failed_mutations = 0 

1232 

1233 while ( 

1234 self.should_generate_more() 

1235 # We implement fairly conservative checks for how long we 

1236 # we should run mutation for, as it's generally not obvious 

1237 # how helpful it is for any given test case. 

1238 and self.call_count <= initial_calls + 5 

1239 and failed_mutations <= 5 

1240 ): 

1241 groups = data.spans.mutator_groups 

1242 if not groups: 

1243 break 

1244 

1245 group = self.random.choice(groups) 

1246 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2) 

1247 if start1 > start2: 

1248 (start1, end1), (start2, end2) = (start2, end2), (start1, end1) 

1249 

1250 if ( 

1251 start1 <= start2 <= end2 <= end1 

1252 ): # pragma: no cover # flaky on conjecture-cover tests 

1253 # One span entirely contains the other. The strategy is very 

1254 # likely some kind of tree. e.g. we might have 

1255 # 

1256 # ┌─────┐ 

1257 # ┌─────┤ a ├──────┐ 

1258 # │ └─────┘ │ 

1259 # ┌──┴──┐ ┌──┴──┐ 

1260 # ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1261 # │ └──┬──┘ │ │ └──┬──┘ │ 

1262 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1263 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1264 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1265 # 

1266 # where each node is drawn from the same strategy and so 

1267 # has the same span label. We might have selected the spans 

1268 # corresponding to the a and c nodes, which is the entire 

1269 # tree and the subtree of (and including) c respectively. 

1270 # 

1271 # There are two possible mutations we could apply in this case: 

1272 # 1. replace a with c (replace child with parent) 

1273 # 2. replace c with a (replace parent with child) 

1274 # 

1275 # (1) results in multiple partial copies of the 

1276 # parent: 

1277 # ┌─────┐ 

1278 # ┌─────┤ a ├────────────┐ 

1279 # │ └─────┘ │ 

1280 # ┌──┴──┐ ┌─┴───┐ 

1281 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐ 

1282 # │ └──┬──┘ │ │ └─────┘ │ 

1283 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐ 

1284 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1285 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │ 

1286 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1287 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1288 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1289 # 

1290 # While (2) results in truncating part of the parent: 

1291 # 

1292 # ┌─────┐ 

1293 # ┌──┤ c ├──┐ 

1294 # │ └──┬──┘ │ 

1295 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1296 # │ g │ │ h │ │ i │ 

1297 # └───┘ └───┘ └───┘ 

1298 # 

1299 # (1) is the same as Example IV.4. in Nautilus (NDSS '19) 

1300 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf), 

1301 # except we do not repeat the replacement additional times 

1302 # (the paper repeats it once for a total of two copies). 

1303 # 

1304 # We currently only apply mutation (1), and ignore mutation 

1305 # (2). The reason is that the attempt generated from (2) is 

1306 # always something that Hypothesis could easily have generated 

1307 # itself, by simply not making various choices. Whereas 

1308 # duplicating the exact value + structure of particular choices 

1309 # in (1) would have been hard for Hypothesis to generate by 

1310 # chance. 

1311 # 

1312 # TODO: an extension of this mutation might repeat (1) on 

1313 # a geometric distribution between 0 and ~10 times. We would 

1314 # need to find the corresponding span to recurse on in the new 

1315 # choices, probably just by using the choices index. 

1316 

1317 # case (1): duplicate the choices in start1:start2. 

1318 attempt = data.choices[:start2] + data.choices[start1:] 

1319 else: 

1320 (start, end) = self.random.choice([(start1, end1), (start2, end2)]) 

1321 replacement = data.choices[start:end] 

1322 # We attempt to replace both the examples with 

1323 # whichever choice we made. Note that this might end 

1324 # up messing up and getting the example boundaries 

1325 # wrong - labels matching are only a best guess as to 

1326 # whether the two are equivalent - but it doesn't 

1327 # really matter. It may not achieve the desired result, 

1328 # but it's still a perfectly acceptable choice sequence 

1329 # to try. 

1330 attempt = ( 

1331 data.choices[:start1] 

1332 + replacement 

1333 + data.choices[end1:start2] 

1334 + replacement 

1335 + data.choices[end2:] 

1336 ) 

1337 

1338 try: 

1339 new_data = self.cached_test_function( 

1340 attempt, 

1341 # We set error_on_discard so that we don't end up 

1342 # entering parts of the tree we consider redundant 

1343 # and not worth exploring. 

1344 error_on_discard=True, 

1345 ) 

1346 except ContainsDiscard: 

1347 failed_mutations += 1 

1348 continue 

1349 

1350 if new_data is Overrun: 

1351 failed_mutations += 1 # pragma: no cover # annoying case 

1352 else: 

1353 assert isinstance(new_data, ConjectureResult) 

1354 if ( 

1355 new_data.status >= data.status 

1356 and choices_key(data.choices) != choices_key(new_data.choices) 

1357 and all( 

1358 k in new_data.target_observations 

1359 and new_data.target_observations[k] >= v 

1360 for k, v in data.target_observations.items() 

1361 ) 

1362 ): 

1363 data = new_data 

1364 failed_mutations = 0 

1365 else: 

1366 failed_mutations += 1 

1367 

1368 def optimise_targets(self) -> None: 

1369 """If any target observations have been made, attempt to optimise them 

1370 all.""" 

1371 if not self.should_optimise: 

1372 return 

1373 from hypothesis.internal.conjecture.optimiser import Optimiser 

1374 

1375 # We want to avoid running the optimiser for too long in case we hit 

1376 # an unbounded target score. We start this off fairly conservatively 

1377 # in case interesting examples are easy to find and then ramp it up 

1378 # on an exponential schedule so we don't hamper the optimiser too much 

1379 # if it needs a long time to find good enough improvements. 

1380 max_improvements = 10 

1381 while True: 

1382 prev_calls = self.call_count 

1383 

1384 any_improvements = False 

1385 

1386 for target, data in list(self.best_examples_of_observed_targets.items()): 

1387 optimiser = Optimiser( 

1388 self, data, target, max_improvements=max_improvements 

1389 ) 

1390 optimiser.run() 

1391 if optimiser.improvements > 0: 

1392 any_improvements = True 

1393 

1394 if self.interesting_examples: 

1395 break 

1396 

1397 max_improvements *= 2 

1398 

1399 if any_improvements: 

1400 continue 

1401 

1402 if self.best_observed_targets: 

1403 self.pareto_optimise() 

1404 

1405 if prev_calls == self.call_count: 

1406 break 

1407 

1408 def pareto_optimise(self) -> None: 

1409 if self.pareto_front is not None: 

1410 ParetoOptimiser(self).run() 

1411 

1412 def _run(self) -> None: 

1413 # have to use the primitive provider to interpret database bits... 

1414 self._switch_to_hypothesis_provider = True 

1415 with self._log_phase_statistics("reuse"): 

1416 self.reuse_existing_examples() 

1417 # Fast path for development: If the database gave us interesting 

1418 # examples from the previously stored primary key, don't try 

1419 # shrinking it again as it's unlikely to work. 

1420 if self.reused_previously_shrunk_test_case: 

1421 self.exit_with(ExitReason.finished) 

1422 # ...but we should use the supplied provider when generating... 

1423 self._switch_to_hypothesis_provider = False 

1424 with self._log_phase_statistics("generate"): 

1425 self.generate_new_examples() 

1426 # We normally run the targeting phase mixed in with the generate phase, 

1427 # but if we've been asked to run it but not generation then we have to 

1428 # run it explicitly on its own here. 

1429 if Phase.generate not in self.settings.phases: 

1430 self._current_phase = "target" 

1431 self.optimise_targets() 

1432 # ...and back to the primitive provider when shrinking. 

1433 self._switch_to_hypothesis_provider = True 

1434 with self._log_phase_statistics("shrink"): 

1435 self.shrink_interesting_examples() 

1436 self.exit_with(ExitReason.finished) 

1437 

1438 def new_conjecture_data( 

1439 self, 

1440 prefix: Sequence[Union[ChoiceT, ChoiceTemplate]], 

1441 *, 

1442 observer: Optional[DataObserver] = None, 

1443 max_choices: Optional[int] = None, 

1444 ) -> ConjectureData: 

1445 provider = ( 

1446 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider 

1447 ) 

1448 observer = observer or self.tree.new_observer() 

1449 if not self.using_hypothesis_backend: 

1450 observer = DataObserver() 

1451 

1452 return ConjectureData( 

1453 prefix=prefix, 

1454 observer=observer, 

1455 provider=provider, 

1456 max_choices=max_choices, 

1457 random=self.random, 

1458 ) 

1459 

1460 def shrink_interesting_examples(self) -> None: 

1461 """If we've found interesting examples, try to replace each of them 

1462 with a minimal interesting example with the same interesting_origin. 

1463 

1464 We may find one or more examples with a new interesting_origin 

1465 during the shrink process. If so we shrink these too. 

1466 """ 

1467 if Phase.shrink not in self.settings.phases or not self.interesting_examples: 

1468 return 

1469 

1470 self.debug("Shrinking interesting examples") 

1471 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS 

1472 

1473 for prev_data in sorted( 

1474 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes) 

1475 ): 

1476 assert prev_data.status == Status.INTERESTING 

1477 data = self.new_conjecture_data(prev_data.choices) 

1478 self.test_function(data) 

1479 if data.status != Status.INTERESTING: 

1480 self.exit_with(ExitReason.flaky) 

1481 

1482 self.clear_secondary_key() 

1483 

1484 while len(self.shrunk_examples) < len(self.interesting_examples): 

1485 target, example = min( 

1486 ( 

1487 (k, v) 

1488 for k, v in self.interesting_examples.items() 

1489 if k not in self.shrunk_examples 

1490 ), 

1491 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))), 

1492 ) 

1493 self.debug(f"Shrinking {target!r}: {example.choices}") 

1494 

1495 if not self.settings.report_multiple_bugs: 

1496 # If multi-bug reporting is disabled, we shrink our currently-minimal 

1497 # failure, allowing 'slips' to any bug with a smaller minimal example. 

1498 self.shrink(example, lambda d: d.status == Status.INTERESTING) 

1499 return 

1500 

1501 def predicate(d: Union[ConjectureResult, _Overrun]) -> bool: 

1502 if d.status < Status.INTERESTING: 

1503 return False 

1504 d = cast(ConjectureResult, d) 

1505 return d.interesting_origin == target 

1506 

1507 self.shrink(example, predicate) 

1508 

1509 self.shrunk_examples.add(target) 

1510 

1511 def clear_secondary_key(self) -> None: 

1512 if self.has_existing_examples(): 

1513 # If we have any smaller examples in the secondary corpus, now is 

1514 # a good time to try them to see if they work as shrinks. They 

1515 # probably won't, but it's worth a shot and gives us a good 

1516 # opportunity to clear out the database. 

1517 

1518 # It's not worth trying the primary corpus because we already 

1519 # tried all of those in the initial phase. 

1520 corpus = sorted( 

1521 self.settings.database.fetch(self.secondary_key), key=shortlex 

1522 ) 

1523 for c in corpus: 

1524 choices = choices_from_bytes(c) 

1525 if choices is None: 

1526 self.settings.database.delete(self.secondary_key, c) 

1527 continue 

1528 primary = { 

1529 choices_to_bytes(v.choices) 

1530 for v in self.interesting_examples.values() 

1531 } 

1532 if shortlex(c) > max(map(shortlex, primary)): 

1533 break 

1534 

1535 self.cached_test_function(choices) 

1536 # We unconditionally remove c from the secondary key as it 

1537 # is either now primary or worse than our primary example 

1538 # of this reason for interestingness. 

1539 self.settings.database.delete(self.secondary_key, c) 

1540 

1541 def shrink( 

1542 self, 

1543 example: Union[ConjectureData, ConjectureResult], 

1544 predicate: Optional[ShrinkPredicateT] = None, 

1545 allow_transition: Optional[ 

1546 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1547 ] = None, 

1548 ) -> Union[ConjectureData, ConjectureResult]: 

1549 s = self.new_shrinker(example, predicate, allow_transition) 

1550 s.shrink() 

1551 return s.shrink_target 

1552 

1553 def new_shrinker( 

1554 self, 

1555 example: Union[ConjectureData, ConjectureResult], 

1556 predicate: Optional[ShrinkPredicateT] = None, 

1557 allow_transition: Optional[ 

1558 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1559 ] = None, 

1560 ) -> Shrinker: 

1561 return Shrinker( 

1562 self, 

1563 example, 

1564 predicate, 

1565 allow_transition=allow_transition, 

1566 explain=Phase.explain in self.settings.phases, 

1567 in_target_phase=self._current_phase == "target", 

1568 ) 

1569 

1570 def passing_choice_sequences( 

1571 self, prefix: Sequence[ChoiceNode] = () 

1572 ) -> frozenset[tuple[ChoiceNode, ...]]: 

1573 """Return a collection of choice sequence nodes which cause the test to pass. 

1574 Optionally restrict this by a certain prefix, which is useful for explain mode. 

1575 """ 

1576 return frozenset( 

1577 cast(ConjectureResult, result).nodes 

1578 for key in self.__data_cache 

1579 if (result := self.__data_cache[key]).status is Status.VALID 

1580 and startswith(cast(ConjectureResult, result).nodes, prefix) 

1581 ) 

1582 

1583 

1584class ContainsDiscard(Exception): 

1585 pass