Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/engine.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

693 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import importlib 

12import inspect 

13import math 

14import time 

15from collections import defaultdict 

16from collections.abc import Generator, Sequence 

17from contextlib import AbstractContextManager, contextmanager, nullcontext, suppress 

18from dataclasses import dataclass, field 

19from datetime import timedelta 

20from enum import Enum 

21from random import Random, getrandbits 

22from typing import Callable, Final, List, Literal, NoReturn, Optional, Union, cast 

23 

24from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings 

25from hypothesis._settings import local_settings, note_deprecation 

26from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes 

27from hypothesis.errors import ( 

28 BackendCannotProceed, 

29 FlakyBackendFailure, 

30 HypothesisException, 

31 InvalidArgument, 

32 StopTest, 

33) 

34from hypothesis.internal.cache import LRUReusedCache 

35from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override 

36from hypothesis.internal.conjecture.choice import ( 

37 ChoiceConstraintsT, 

38 ChoiceKeyT, 

39 ChoiceNode, 

40 ChoiceT, 

41 ChoiceTemplate, 

42 choices_key, 

43) 

44from hypothesis.internal.conjecture.data import ( 

45 ConjectureData, 

46 ConjectureResult, 

47 DataObserver, 

48 Overrun, 

49 Status, 

50 _Overrun, 

51) 

52from hypothesis.internal.conjecture.datatree import ( 

53 DataTree, 

54 PreviouslyUnseenBehaviour, 

55 TreeRecordingObserver, 

56) 

57from hypothesis.internal.conjecture.junkdrawer import ( 

58 ensure_free_stackframes, 

59 startswith, 

60) 

61from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser 

62from hypothesis.internal.conjecture.providers import ( 

63 AVAILABLE_PROVIDERS, 

64 HypothesisProvider, 

65 PrimitiveProvider, 

66) 

67from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key 

68from hypothesis.internal.escalation import InterestingOrigin 

69from hypothesis.internal.healthcheck import fail_health_check 

70from hypothesis.internal.observability import Observation, with_observation_callback 

71from hypothesis.reporting import base_report, report 

72 

73#: The maximum number of times the shrinker will reduce the complexity of a failing 

74#: input before giving up. This avoids falling down a trap of exponential (or worse) 

75#: complexity, where the shrinker appears to be making progress but will take a 

76#: substantially long time to finish completely. 

77MAX_SHRINKS: Final[int] = 500 

78 

79# If the shrinking phase takes more than five minutes, abort it early and print 

80# a warning. Many CI systems will kill a build after around ten minutes with 

81# no output, and appearing to hang isn't great for interactive use either - 

82# showing partially-shrunk examples is better than quitting with no examples! 

83# (but make it monkeypatchable, for the rare users who need to keep on shrinking) 

84 

85#: The maximum total time in seconds that the shrinker will try to shrink a failure 

86#: for before giving up. This is across all shrinks for the same failure, so even 

87#: if the shrinker successfully reduces the complexity of a single failure several 

88#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken. 

89MAX_SHRINKING_SECONDS: Final[int] = 300 

90 

91#: The maximum amount of entropy a single test case can use before giving up 

92#: while making random choices during input generation. 

93#: 

94#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you 

95#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly 

96#: increase the amount of entropy a test case can use during generation. 

97BUFFER_SIZE: Final[int] = 8 * 1024 

98CACHE_SIZE: Final[int] = 10000 

99MIN_TEST_CALLS: Final[int] = 10 

100 

101 

102def shortlex(s): 

103 return (len(s), s) 

104 

105 

106@dataclass 

107class HealthCheckState: 

108 valid_examples: int = field(default=0) 

109 invalid_examples: int = field(default=0) 

110 overrun_examples: int = field(default=0) 

111 draw_times: "defaultdict[str, List[float]]" = field( 

112 default_factory=lambda: defaultdict(list) 

113 ) 

114 

115 @property 

116 def total_draw_time(self) -> float: 

117 return math.fsum(sum(self.draw_times.values(), start=[])) 

118 

119 def timing_report(self) -> str: 

120 """Return a terminal report describing what was slow.""" 

121 if not self.draw_times: 

122 return "" 

123 width = max( 

124 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times 

125 ) 

126 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"] 

127 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1])) 

128 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch 

129 # If we have very many unique keys, which can happen due to interactive 

130 # draws with computed labels, we'll skip uninformative rows. 

131 if ( 

132 5 <= i < (len(self.draw_times) - 2) 

133 and math.fsum(times) * 20 < self.total_draw_time 

134 ): 

135 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)") 

136 break 

137 # Compute the row to report, omitting times <1ms to focus on slow draws 

138 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4] 

139 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",") 

140 arg = argname.removeprefix("generate:").removesuffix(": ") 

141 out.append( 

142 f" {arg:^{width}} | {len(times):>4} | " 

143 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}" 

144 ) 

145 return "\n".join(out) 

146 

147 

148class ExitReason(Enum): 

149 max_examples = "settings.max_examples={s.max_examples}" 

150 max_iterations = ( 

151 "settings.max_examples={s.max_examples}, " 

152 "but < 10% of examples satisfied assumptions" 

153 ) 

154 max_shrinks = f"shrunk example {MAX_SHRINKS} times" 

155 finished = "nothing left to do" 

156 flaky = "test was flaky" 

157 very_slow_shrinking = "shrinking was very slow" 

158 

159 def describe(self, settings: Settings) -> str: 

160 return self.value.format(s=settings) 

161 

162 

163class RunIsComplete(Exception): 

164 pass 

165 

166 

167def _get_provider(backend: str) -> Union[type, PrimitiveProvider]: 

168 mname, cname = AVAILABLE_PROVIDERS[backend].rsplit(".", 1) 

169 provider_cls = getattr(importlib.import_module(mname), cname) 

170 if provider_cls.lifetime == "test_function": 

171 return provider_cls(None) 

172 elif provider_cls.lifetime == "test_case": 

173 return provider_cls 

174 else: 

175 raise InvalidArgument( 

176 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. " 

177 "Expected one of 'test_function', 'test_case'." 

178 ) 

179 

180 

181class CallStats(TypedDict): 

182 status: str 

183 runtime: float 

184 drawtime: float 

185 gctime: float 

186 events: list[str] 

187 

188 

189PhaseStatistics = TypedDict( 

190 "PhaseStatistics", 

191 { 

192 "duration-seconds": float, 

193 "test-cases": list[CallStats], 

194 "distinct-failures": int, 

195 "shrinks-successful": int, 

196 }, 

197) 

198StatisticsDict = TypedDict( 

199 "StatisticsDict", 

200 { 

201 "generate-phase": NotRequired[PhaseStatistics], 

202 "reuse-phase": NotRequired[PhaseStatistics], 

203 "shrink-phase": NotRequired[PhaseStatistics], 

204 "stopped-because": NotRequired[str], 

205 "targets": NotRequired[dict[str, float]], 

206 "nodeid": NotRequired[str], 

207 }, 

208) 

209 

210 

211def choice_count(choices: Sequence[Union[ChoiceT, ChoiceTemplate]]) -> Optional[int]: 

212 count = 0 

213 for choice in choices: 

214 if isinstance(choice, ChoiceTemplate): 

215 if choice.count is None: 

216 return None 

217 count += choice.count 

218 else: 

219 count += 1 

220 return count 

221 

222 

223class DiscardObserver(DataObserver): 

224 @override 

225 def kill_branch(self) -> NoReturn: 

226 raise ContainsDiscard 

227 

228 

229def realize_choices(data: ConjectureData, *, for_failure: bool) -> None: 

230 # backwards-compatibility with backends without for_failure, can remove 

231 # in a few months 

232 kwargs = {} 

233 if for_failure: 

234 if "for_failure" in inspect.signature(data.provider.realize).parameters: 

235 kwargs["for_failure"] = True 

236 else: 

237 note_deprecation( 

238 f"{type(data.provider).__qualname__}.realize does not have the " 

239 "for_failure parameter. This will be an error in future versions " 

240 "of Hypothesis. (If you installed this backend from a separate " 

241 "package, upgrading that package may help).", 

242 has_codemod=False, 

243 since="2025-05-07", 

244 ) 

245 

246 for node in data.nodes: 

247 value = data.provider.realize(node.value, **kwargs) 

248 expected_type = { 

249 "string": str, 

250 "float": float, 

251 "integer": int, 

252 "boolean": bool, 

253 "bytes": bytes, 

254 }[node.type] 

255 if type(value) is not expected_type: 

256 raise HypothesisException( 

257 f"expected {expected_type} from " 

258 f"{data.provider.realize.__qualname__}, got {type(value)}" 

259 ) 

260 

261 constraints = cast( 

262 ChoiceConstraintsT, 

263 { 

264 k: data.provider.realize(v, **kwargs) 

265 for k, v in node.constraints.items() 

266 }, 

267 ) 

268 node.value = value 

269 node.constraints = constraints 

270 

271 

272class ConjectureRunner: 

273 def __init__( 

274 self, 

275 test_function: Callable[[ConjectureData], None], 

276 *, 

277 settings: Optional[Settings] = None, 

278 random: Optional[Random] = None, 

279 database_key: Optional[bytes] = None, 

280 ignore_limits: bool = False, 

281 ) -> None: 

282 self._test_function: Callable[[ConjectureData], None] = test_function 

283 self.settings: Settings = settings or Settings() 

284 self.shrinks: int = 0 

285 self.finish_shrinking_deadline: Optional[float] = None 

286 self.call_count: int = 0 

287 self.misaligned_count: int = 0 

288 self.valid_examples: int = 0 

289 self.invalid_examples: int = 0 

290 self.overrun_examples: int = 0 

291 self.random: Random = random or Random(getrandbits(128)) 

292 self.database_key: Optional[bytes] = database_key 

293 self.ignore_limits: bool = ignore_limits 

294 

295 # Global dict of per-phase statistics, and a list of per-call stats 

296 # which transfer to the global dict at the end of each phase. 

297 self._current_phase: str = "(not a phase)" 

298 self.statistics: StatisticsDict = {} 

299 self.stats_per_test_case: list[CallStats] = [] 

300 

301 self.interesting_examples: dict[InterestingOrigin, ConjectureResult] = {} 

302 # We use call_count because there may be few possible valid_examples. 

303 self.first_bug_found_at: Optional[int] = None 

304 self.last_bug_found_at: Optional[int] = None 

305 

306 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests. 

307 self.shrunk_examples: set[Optional[InterestingOrigin]] = set() 

308 

309 self.health_check_state: Optional[HealthCheckState] = None 

310 

311 self.tree: DataTree = DataTree() 

312 

313 self.provider: Union[type, PrimitiveProvider] = _get_provider( 

314 self.settings.backend 

315 ) 

316 

317 self.best_observed_targets: defaultdict[str, float] = defaultdict( 

318 lambda: NO_SCORE 

319 ) 

320 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {} 

321 

322 # We keep the pareto front in the example database if we have one. This 

323 # is only marginally useful at present, but speeds up local development 

324 # because it means that large targets will be quickly surfaced in your 

325 # testing. 

326 self.pareto_front: Optional[ParetoFront] = None 

327 if self.database_key is not None and self.settings.database is not None: 

328 self.pareto_front = ParetoFront(self.random) 

329 self.pareto_front.on_evict(self.on_pareto_evict) 

330 

331 # We want to be able to get the ConjectureData object that results 

332 # from running a buffer without recalculating, especially during 

333 # shrinking where we need to know about the structure of the 

334 # executed test case. 

335 self.__data_cache = LRUReusedCache[ 

336 tuple[ChoiceKeyT, ...], Union[ConjectureResult, _Overrun] 

337 ](CACHE_SIZE) 

338 

339 self.reused_previously_shrunk_test_case: bool = False 

340 

341 self.__pending_call_explanation: Optional[str] = None 

342 self._backend_found_failure: bool = False 

343 self._backend_exceeded_deadline: bool = False 

344 self._switch_to_hypothesis_provider: bool = False 

345 

346 self.__failed_realize_count: int = 0 

347 # note unsound verification by alt backends 

348 self._verified_by: Optional[str] = None 

349 

350 @contextmanager 

351 def _with_switch_to_hypothesis_provider( 

352 self, value: bool 

353 ) -> Generator[None, None, None]: 

354 previous = self._switch_to_hypothesis_provider 

355 try: 

356 self._switch_to_hypothesis_provider = value 

357 yield 

358 finally: 

359 self._switch_to_hypothesis_provider = previous 

360 

361 @property 

362 def using_hypothesis_backend(self) -> bool: 

363 return ( 

364 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider 

365 ) 

366 

367 def explain_next_call_as(self, explanation: str) -> None: 

368 self.__pending_call_explanation = explanation 

369 

370 def clear_call_explanation(self) -> None: 

371 self.__pending_call_explanation = None 

372 

373 @contextmanager 

374 def _log_phase_statistics( 

375 self, phase: Literal["reuse", "generate", "shrink"] 

376 ) -> Generator[None, None, None]: 

377 self.stats_per_test_case.clear() 

378 start_time = time.perf_counter() 

379 try: 

380 self._current_phase = phase 

381 yield 

382 finally: 

383 # We ignore the mypy type error here. Because `phase` is a string literal and "-phase" is a string literal 

384 # as well, the concatenation will always be valid key in the dictionary. 

385 self.statistics[phase + "-phase"] = { # type: ignore 

386 "duration-seconds": time.perf_counter() - start_time, 

387 "test-cases": list(self.stats_per_test_case), 

388 "distinct-failures": len(self.interesting_examples), 

389 "shrinks-successful": self.shrinks, 

390 } 

391 

392 @property 

393 def should_optimise(self) -> bool: 

394 return Phase.target in self.settings.phases 

395 

396 def __tree_is_exhausted(self) -> bool: 

397 return self.tree.is_exhausted and self.using_hypothesis_backend 

398 

399 def __stoppable_test_function(self, data: ConjectureData) -> None: 

400 """Run ``self._test_function``, but convert a ``StopTest`` exception 

401 into a normal return and avoid raising anything flaky for RecursionErrors. 

402 """ 

403 # We ensure that the test has this much stack space remaining, no 

404 # matter the size of the stack when called, to de-flake RecursionErrors 

405 # (#2494, #3671). Note, this covers the data generation part of the test; 

406 # the actual test execution is additionally protected at the call site 

407 # in hypothesis.core.execute_once. 

408 with ensure_free_stackframes(): 

409 try: 

410 self._test_function(data) 

411 except StopTest as e: 

412 if e.testcounter == data.testcounter: 

413 # This StopTest has successfully stopped its test, and can now 

414 # be discarded. 

415 pass 

416 else: 

417 # This StopTest was raised by a different ConjectureData. We 

418 # need to re-raise it so that it will eventually reach the 

419 # correct engine. 

420 raise 

421 

422 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]: 

423 return choices_key(choices) 

424 

425 def _cache(self, data: ConjectureData) -> None: 

426 result = data.as_result() 

427 key = self._cache_key(data.choices) 

428 self.__data_cache[key] = result 

429 

430 def cached_test_function( 

431 self, 

432 choices: Sequence[Union[ChoiceT, ChoiceTemplate]], 

433 *, 

434 error_on_discard: bool = False, 

435 extend: Union[int, Literal["full"]] = 0, 

436 ) -> Union[ConjectureResult, _Overrun]: 

437 """ 

438 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard`` 

439 in preference to running the actual test function. This is to allow us 

440 to skip test cases we expect to be redundant in some cases. Note that 

441 it may be the case that we don't raise ``ContainsDiscard`` even if the 

442 result has discards if we cannot determine from previous runs whether 

443 it will have a discard. 

444 """ 

445 # node templates represent a not-yet-filled hole and therefore cannot 

446 # be cached or retrieved from the cache. 

447 if not any(isinstance(choice, ChoiceTemplate) for choice in choices): 

448 # this type cast is validated by the isinstance check above (ie, there 

449 # are no ChoiceTemplate elements). 

450 choices = cast(Sequence[ChoiceT], choices) 

451 key = self._cache_key(choices) 

452 try: 

453 cached = self.__data_cache[key] 

454 # if we have a cached overrun for this key, but we're allowing extensions 

455 # of the nodes, it could in fact run to a valid data if we try. 

456 if extend == 0 or cached.status is not Status.OVERRUN: 

457 return cached 

458 except KeyError: 

459 pass 

460 

461 if extend == "full": 

462 max_length = None 

463 elif (count := choice_count(choices)) is None: 

464 max_length = None 

465 else: 

466 max_length = count + extend 

467 

468 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver. 

469 # The reason is we don't expect simulate_test_function to explore new choices 

470 # and write back to the tree, so we don't want the overhead of the 

471 # TreeRecordingObserver tracking those calls. 

472 trial_observer: Optional[DataObserver] = DataObserver() 

473 if error_on_discard: 

474 trial_observer = DiscardObserver() 

475 

476 try: 

477 trial_data = self.new_conjecture_data( 

478 choices, observer=trial_observer, max_choices=max_length 

479 ) 

480 self.tree.simulate_test_function(trial_data) 

481 except PreviouslyUnseenBehaviour: 

482 pass 

483 else: 

484 trial_data.freeze() 

485 key = self._cache_key(trial_data.choices) 

486 if trial_data.status > Status.OVERRUN: 

487 try: 

488 return self.__data_cache[key] 

489 except KeyError: 

490 pass 

491 else: 

492 # if we simulated to an overrun, then we our result is certainly 

493 # an overrun; no need to consult the cache. (and we store this result 

494 # for simulation-less lookup later). 

495 self.__data_cache[key] = Overrun 

496 return Overrun 

497 try: 

498 return self.__data_cache[key] 

499 except KeyError: 

500 pass 

501 

502 data = self.new_conjecture_data(choices, max_choices=max_length) 

503 # note that calling test_function caches `data` for us, for both an ir 

504 # tree key and a buffer key. 

505 self.test_function(data) 

506 return data.as_result() 

507 

508 def test_function(self, data: ConjectureData) -> None: 

509 if self.__pending_call_explanation is not None: 

510 self.debug(self.__pending_call_explanation) 

511 self.__pending_call_explanation = None 

512 

513 self.call_count += 1 

514 interrupted = False 

515 

516 try: 

517 self.__stoppable_test_function(data) 

518 except KeyboardInterrupt: 

519 interrupted = True 

520 raise 

521 except BackendCannotProceed as exc: 

522 if exc.scope in ("verified", "exhausted"): 

523 self._switch_to_hypothesis_provider = True 

524 if exc.scope == "verified": 

525 self._verified_by = self.settings.backend 

526 elif exc.scope == "discard_test_case": 

527 self.__failed_realize_count += 1 

528 if ( 

529 self.__failed_realize_count > 10 

530 and (self.__failed_realize_count / self.call_count) > 0.2 

531 ): 

532 self._switch_to_hypothesis_provider = True 

533 

534 # treat all BackendCannotProceed exceptions as invalid. This isn't 

535 # great; "verified" should really be counted as self.valid_examples += 1. 

536 # But we check self.valid_examples == 0 to determine whether to raise 

537 # Unsatisfiable, and that would throw this check off. 

538 self.invalid_examples += 1 

539 

540 # skip the post-test-case tracking; we're pretending this never happened 

541 interrupted = True 

542 data.cannot_proceed_scope = exc.scope 

543 data.freeze() 

544 return 

545 except BaseException: 

546 data.freeze() 

547 if self.settings.backend != "hypothesis": 

548 realize_choices(data, for_failure=True) 

549 self.save_choices(data.choices) 

550 raise 

551 finally: 

552 # No branch, because if we're interrupted we always raise 

553 # the KeyboardInterrupt, never continue to the code below. 

554 if not interrupted: # pragma: no branch 

555 assert data.cannot_proceed_scope is None 

556 data.freeze() 

557 call_stats: CallStats = { 

558 "status": data.status.name.lower(), 

559 "runtime": data.finish_time - data.start_time, 

560 "drawtime": math.fsum(data.draw_times.values()), 

561 "gctime": data.gc_finish_time - data.gc_start_time, 

562 "events": sorted( 

563 k if v == "" else f"{k}: {v}" for k, v in data.events.items() 

564 ), 

565 } 

566 self.stats_per_test_case.append(call_stats) 

567 if self.settings.backend != "hypothesis": 

568 realize_choices(data, for_failure=data.status is Status.INTERESTING) 

569 

570 self._cache(data) 

571 if data.misaligned_at is not None: # pragma: no branch # coverage bug? 

572 self.misaligned_count += 1 

573 

574 self.debug_data(data) 

575 

576 if ( 

577 data.target_observations 

578 and self.pareto_front is not None 

579 and self.pareto_front.add(data.as_result()) 

580 ): 

581 self.save_choices(data.choices, sub_key=b"pareto") 

582 

583 if data.status >= Status.VALID: 

584 for k, v in data.target_observations.items(): 

585 self.best_observed_targets[k] = max(self.best_observed_targets[k], v) 

586 

587 if k not in self.best_examples_of_observed_targets: 

588 data_as_result = data.as_result() 

589 assert not isinstance(data_as_result, _Overrun) 

590 self.best_examples_of_observed_targets[k] = data_as_result 

591 continue 

592 

593 existing_example = self.best_examples_of_observed_targets[k] 

594 existing_score = existing_example.target_observations[k] 

595 

596 if v < existing_score: 

597 continue 

598 

599 if v > existing_score or sort_key(data.nodes) < sort_key( 

600 existing_example.nodes 

601 ): 

602 data_as_result = data.as_result() 

603 assert not isinstance(data_as_result, _Overrun) 

604 self.best_examples_of_observed_targets[k] = data_as_result 

605 

606 if data.status is Status.VALID: 

607 self.valid_examples += 1 

608 if data.status is Status.INVALID: 

609 self.invalid_examples += 1 

610 if data.status is Status.OVERRUN: 

611 self.overrun_examples += 1 

612 

613 if data.status == Status.INTERESTING: 

614 if not self.using_hypothesis_backend: 

615 # replay this failure on the hypothesis backend to ensure it still 

616 # finds a failure. otherwise, it is flaky. 

617 initial_exception = data.expected_exception 

618 data = ConjectureData.for_choices(data.choices) 

619 # we've already going to use the hypothesis provider for this 

620 # data, so the verb "switch" is a bit misleading here. We're really 

621 # setting this to inform our on_observation logic that the observation 

622 # generated here was from a hypothesis backend, and shouldn't be 

623 # sent to the on_observation of any alternative backend. 

624 with self._with_switch_to_hypothesis_provider(True): 

625 self.__stoppable_test_function(data) 

626 data.freeze() 

627 # TODO: Should same-origin also be checked? (discussion in 

628 # https://github.com/HypothesisWorks/hypothesis/pull/4470#discussion_r2217055487) 

629 if data.status != Status.INTERESTING: 

630 desc_new_status = { 

631 data.status.VALID: "passed", 

632 data.status.INVALID: "failed filters", 

633 data.status.OVERRUN: "overran", 

634 }[data.status] 

635 raise FlakyBackendFailure( 

636 f"Inconsistent results from replaying a failing test case! " 

637 f"Raised {type(initial_exception).__name__} on " 

638 f"backend={self.settings.backend!r}, but " 

639 f"{desc_new_status} under backend='hypothesis'.", 

640 [initial_exception], 

641 ) 

642 

643 self._cache(data) 

644 

645 assert data.interesting_origin is not None 

646 key = data.interesting_origin 

647 changed = False 

648 try: 

649 existing = self.interesting_examples[key] 

650 except KeyError: 

651 changed = True 

652 self.last_bug_found_at = self.call_count 

653 if self.first_bug_found_at is None: 

654 self.first_bug_found_at = self.call_count 

655 else: 

656 if sort_key(data.nodes) < sort_key(existing.nodes): 

657 self.shrinks += 1 

658 self.downgrade_choices(existing.choices) 

659 self.__data_cache.unpin(self._cache_key(existing.choices)) 

660 changed = True 

661 

662 if changed: 

663 self.save_choices(data.choices) 

664 self.interesting_examples[key] = data.as_result() # type: ignore 

665 if not self.using_hypothesis_backend: 

666 self._backend_found_failure = True 

667 self.__data_cache.pin(self._cache_key(data.choices), data.as_result()) 

668 self.shrunk_examples.discard(key) 

669 

670 if self.shrinks >= MAX_SHRINKS: 

671 self.exit_with(ExitReason.max_shrinks) 

672 

673 if ( 

674 not self.ignore_limits 

675 and self.finish_shrinking_deadline is not None 

676 and self.finish_shrinking_deadline < time.perf_counter() 

677 ): 

678 # See https://github.com/HypothesisWorks/hypothesis/issues/2340 

679 report( 

680 "WARNING: Hypothesis has spent more than five minutes working to shrink" 

681 " a failing example, and stopped because it is making very slow" 

682 " progress. When you re-run your tests, shrinking will resume and may" 

683 " take this long before aborting again.\nPLEASE REPORT THIS if you can" 

684 " provide a reproducing example, so that we can improve shrinking" 

685 " performance for everyone." 

686 ) 

687 self.exit_with(ExitReason.very_slow_shrinking) 

688 

689 if not self.interesting_examples: 

690 # Note that this logic is reproduced to end the generation phase when 

691 # we have interesting examples. Update that too if you change this! 

692 # (The doubled implementation is because here we exit the engine entirely, 

693 # while in the other case below we just want to move on to shrinking.) 

694 if self.valid_examples >= self.settings.max_examples: 

695 self.exit_with(ExitReason.max_examples) 

696 if self.call_count >= max( 

697 self.settings.max_examples * 10, 

698 # We have a high-ish default max iterations, so that tests 

699 # don't become flaky when max_examples is too low. 

700 1000, 

701 ): 

702 self.exit_with(ExitReason.max_iterations) 

703 

704 if self.__tree_is_exhausted(): 

705 self.exit_with(ExitReason.finished) 

706 

707 self.record_for_health_check(data) 

708 

709 def on_pareto_evict(self, data: ConjectureResult) -> None: 

710 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices)) 

711 

712 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]: 

713 """Uses the tree to proactively generate a starting choice sequence 

714 that we haven't explored yet for this test. 

715 

716 When this method is called, we assume that there must be at 

717 least one novel prefix left to find. If there were not, then the 

718 test run should have already stopped due to tree exhaustion. 

719 """ 

720 return self.tree.generate_novel_prefix(self.random) 

721 

722 def record_for_health_check(self, data: ConjectureData) -> None: 

723 # Once we've actually found a bug, there's no point in trying to run 

724 # health checks - they'll just mask the actually important information. 

725 if data.status == Status.INTERESTING: 

726 self.health_check_state = None 

727 

728 state = self.health_check_state 

729 

730 if state is None: 

731 return 

732 

733 for k, v in data.draw_times.items(): 

734 state.draw_times[k].append(v) 

735 

736 if data.status == Status.VALID: 

737 state.valid_examples += 1 

738 elif data.status == Status.INVALID: 

739 state.invalid_examples += 1 

740 else: 

741 assert data.status == Status.OVERRUN 

742 state.overrun_examples += 1 

743 

744 max_valid_draws = 10 

745 max_invalid_draws = 50 

746 max_overrun_draws = 20 

747 

748 assert state.valid_examples <= max_valid_draws 

749 

750 if state.valid_examples == max_valid_draws: 

751 self.health_check_state = None 

752 return 

753 

754 if state.overrun_examples == max_overrun_draws: 

755 fail_health_check( 

756 self.settings, 

757 "Examples routinely exceeded the max allowable size. " 

758 f"({state.overrun_examples} examples overran while generating " 

759 f"{state.valid_examples} valid ones). Generating examples this large " 

760 "will usually lead to bad results. You could try setting max_size " 

761 "parameters on your collections and turning max_leaves down on " 

762 "recursive() calls.", 

763 HealthCheck.data_too_large, 

764 ) 

765 if state.invalid_examples == max_invalid_draws: 

766 fail_health_check( 

767 self.settings, 

768 "It looks like your strategy is filtering out a lot of data. Health " 

769 f"check found {state.invalid_examples} filtered examples but only " 

770 f"{state.valid_examples} good ones. This will make your tests much " 

771 "slower, and also will probably distort the data generation quite a " 

772 "lot. You should adapt your strategy to filter less. This can also " 

773 "be caused by a low max_leaves parameter in recursive() calls", 

774 HealthCheck.filter_too_much, 

775 ) 

776 

777 draw_time = state.total_draw_time 

778 

779 # Allow at least the greater of one second or 5x the deadline. If deadline 

780 # is None, allow 30s - the user can disable the healthcheck too if desired. 

781 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6)) 

782 if draw_time > max(1.0, draw_time_limit.total_seconds()): 

783 fail_health_check( 

784 self.settings, 

785 "Data generation is extremely slow: Only produced " 

786 f"{state.valid_examples} valid examples in {draw_time:.2f} seconds " 

787 f"({state.invalid_examples} invalid ones and {state.overrun_examples} " 

788 "exceeded maximum size). Try decreasing size of the data you're " 

789 "generating (with e.g. max_size or max_leaves parameters)." 

790 + state.timing_report(), 

791 HealthCheck.too_slow, 

792 ) 

793 

794 def save_choices( 

795 self, choices: Sequence[ChoiceT], sub_key: Optional[bytes] = None 

796 ) -> None: 

797 if self.settings.database is not None: 

798 key = self.sub_key(sub_key) 

799 if key is None: 

800 return 

801 self.settings.database.save(key, choices_to_bytes(choices)) 

802 

803 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None: 

804 buffer = choices_to_bytes(choices) 

805 if self.settings.database is not None and self.database_key is not None: 

806 self.settings.database.move(self.database_key, self.secondary_key, buffer) 

807 

808 def sub_key(self, sub_key: Optional[bytes]) -> Optional[bytes]: 

809 if self.database_key is None: 

810 return None 

811 if sub_key is None: 

812 return self.database_key 

813 return b".".join((self.database_key, sub_key)) 

814 

815 @property 

816 def secondary_key(self) -> Optional[bytes]: 

817 return self.sub_key(b"secondary") 

818 

819 @property 

820 def pareto_key(self) -> Optional[bytes]: 

821 return self.sub_key(b"pareto") 

822 

823 def debug(self, message: str) -> None: 

824 if self.settings.verbosity >= Verbosity.debug: 

825 base_report(message) 

826 

827 @property 

828 def report_debug_info(self) -> bool: 

829 return self.settings.verbosity >= Verbosity.debug 

830 

831 def debug_data(self, data: Union[ConjectureData, ConjectureResult]) -> None: 

832 if not self.report_debug_info: 

833 return 

834 

835 status = repr(data.status) 

836 if data.status == Status.INTERESTING: 

837 status = f"{status} ({data.interesting_origin!r})" 

838 

839 self.debug( 

840 f"{len(data.choices)} choices {data.choices} -> {status}" 

841 f"{', ' + data.output if data.output else ''}" 

842 ) 

843 

844 def observe_for_provider(self) -> AbstractContextManager: 

845 def on_observation(observation: Observation) -> None: 

846 assert observation.type == "test_case" 

847 # because lifetime == "test_function" 

848 assert isinstance(self.provider, PrimitiveProvider) 

849 # only fire if we actually used that provider to generate this observation 

850 if not self._switch_to_hypothesis_provider: 

851 self.provider.on_observation(observation) 

852 

853 if ( 

854 self.settings.backend != "hypothesis" 

855 # only for lifetime = "test_function" providers (guaranteed 

856 # by this isinstance check) 

857 and isinstance(self.provider, PrimitiveProvider) 

858 # and the provider opted-in to observations 

859 and self.provider.add_observability_callback 

860 ): 

861 return with_observation_callback(on_observation) 

862 return nullcontext() 

863 

864 def run(self) -> None: 

865 with local_settings(self.settings): 

866 # NOTE: For compatibility with Python 3.9's LL(1) 

867 # parser, this is written as a nested with-statement, 

868 # instead of a compound one. 

869 with self.observe_for_provider(): 

870 try: 

871 self._run() 

872 except RunIsComplete: 

873 pass 

874 for v in self.interesting_examples.values(): 

875 self.debug_data(v) 

876 self.debug( 

877 "Run complete after %d examples (%d valid) and %d shrinks" 

878 % (self.call_count, self.valid_examples, self.shrinks) 

879 ) 

880 

881 @property 

882 def database(self) -> Optional[ExampleDatabase]: 

883 if self.database_key is None: 

884 return None 

885 return self.settings.database 

886 

887 def has_existing_examples(self) -> bool: 

888 return self.database is not None and Phase.reuse in self.settings.phases 

889 

890 def reuse_existing_examples(self) -> None: 

891 """If appropriate (we have a database and have been told to use it), 

892 try to reload existing examples from the database. 

893 

894 If there are a lot we don't try all of them. We always try the 

895 smallest example in the database (which is guaranteed to be the 

896 last failure) and the largest (which is usually the seed example 

897 which the last failure came from but we don't enforce that). We 

898 then take a random sampling of the remainder and try those. Any 

899 examples that are no longer interesting are cleared out. 

900 """ 

901 if self.has_existing_examples(): 

902 self.debug("Reusing examples from database") 

903 # We have to do some careful juggling here. We have two database 

904 # corpora: The primary and secondary. The primary corpus is a 

905 # small set of minimized examples each of which has at one point 

906 # demonstrated a distinct bug. We want to retry all of these. 

907 

908 # We also have a secondary corpus of examples that have at some 

909 # point demonstrated interestingness (currently only ones that 

910 # were previously non-minimal examples of a bug, but this will 

911 # likely expand in future). These are a good source of potentially 

912 # interesting examples, but there are a lot of them, so we down 

913 # sample the secondary corpus to a more manageable size. 

914 

915 corpus = sorted( 

916 self.settings.database.fetch(self.database_key), key=shortlex 

917 ) 

918 factor = 0.1 if (Phase.generate in self.settings.phases) else 1 

919 desired_size = max(2, ceil(factor * self.settings.max_examples)) 

920 primary_corpus_size = len(corpus) 

921 

922 if len(corpus) < desired_size: 

923 extra_corpus = list(self.settings.database.fetch(self.secondary_key)) 

924 

925 shortfall = desired_size - len(corpus) 

926 

927 if len(extra_corpus) <= shortfall: 

928 extra = extra_corpus 

929 else: 

930 extra = self.random.sample(extra_corpus, shortfall) 

931 extra.sort(key=shortlex) 

932 corpus.extend(extra) 

933 

934 # We want a fast path where every primary entry in the database was 

935 # interesting. 

936 found_interesting_in_primary = False 

937 all_interesting_in_primary_were_exact = True 

938 

939 for i, existing in enumerate(corpus): 

940 if i >= primary_corpus_size and found_interesting_in_primary: 

941 break 

942 choices = choices_from_bytes(existing) 

943 if choices is None: 

944 # clear out any keys which fail deserialization 

945 self.settings.database.delete(self.database_key, existing) 

946 continue 

947 data = self.cached_test_function(choices, extend="full") 

948 if data.status != Status.INTERESTING: 

949 self.settings.database.delete(self.database_key, existing) 

950 self.settings.database.delete(self.secondary_key, existing) 

951 else: 

952 if i < primary_corpus_size: 

953 found_interesting_in_primary = True 

954 assert not isinstance(data, _Overrun) 

955 if choices_key(choices) != choices_key(data.choices): 

956 all_interesting_in_primary_were_exact = False 

957 if not self.settings.report_multiple_bugs: 

958 break 

959 if found_interesting_in_primary: 

960 if all_interesting_in_primary_were_exact: 

961 self.reused_previously_shrunk_test_case = True 

962 

963 # Because self.database is not None (because self.has_existing_examples()) 

964 # and self.database_key is not None (because we fetched using it above), 

965 # we can guarantee self.pareto_front is not None 

966 assert self.pareto_front is not None 

967 

968 # If we've not found any interesting examples so far we try some of 

969 # the pareto front from the last run. 

970 if len(corpus) < desired_size and not self.interesting_examples: 

971 desired_extra = desired_size - len(corpus) 

972 pareto_corpus = list(self.settings.database.fetch(self.pareto_key)) 

973 if len(pareto_corpus) > desired_extra: 

974 pareto_corpus = self.random.sample(pareto_corpus, desired_extra) 

975 pareto_corpus.sort(key=shortlex) 

976 

977 for existing in pareto_corpus: 

978 choices = choices_from_bytes(existing) 

979 if choices is None: 

980 self.settings.database.delete(self.pareto_key, existing) 

981 continue 

982 data = self.cached_test_function(choices, extend="full") 

983 if data not in self.pareto_front: 

984 self.settings.database.delete(self.pareto_key, existing) 

985 if data.status == Status.INTERESTING: 

986 break 

987 

988 def exit_with(self, reason: ExitReason) -> None: 

989 if self.ignore_limits: 

990 return 

991 self.statistics["stopped-because"] = reason.describe(self.settings) 

992 if self.best_observed_targets: 

993 self.statistics["targets"] = dict(self.best_observed_targets) 

994 self.debug(f"exit_with({reason.name})") 

995 self.exit_reason = reason 

996 raise RunIsComplete 

997 

998 def should_generate_more(self) -> bool: 

999 # End the generation phase where we would have ended it if no bugs had 

1000 # been found. This reproduces the exit logic in `self.test_function`, 

1001 # but with the important distinction that this clause will move on to 

1002 # the shrinking phase having found one or more bugs, while the other 

1003 # will exit having found zero bugs. 

1004 if self.valid_examples >= self.settings.max_examples or self.call_count >= max( 

1005 self.settings.max_examples * 10, 1000 

1006 ): # pragma: no cover 

1007 return False 

1008 

1009 # If we haven't found a bug, keep looking - if we hit any limits on 

1010 # the number of tests to run that will raise an exception and stop 

1011 # the run. 

1012 if not self.interesting_examples: 

1013 return True 

1014 # Users who disable shrinking probably want to exit as fast as possible. 

1015 # If we've found a bug and won't report more than one, stop looking. 

1016 elif ( 

1017 Phase.shrink not in self.settings.phases 

1018 or not self.settings.report_multiple_bugs 

1019 ): 

1020 return False 

1021 assert isinstance(self.first_bug_found_at, int) 

1022 assert isinstance(self.last_bug_found_at, int) 

1023 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count 

1024 # Otherwise, keep searching for between ten and 'a heuristic' calls. 

1025 # We cap 'calls after first bug' so errors are reported reasonably 

1026 # soon even for tests that are allowed to run for a very long time, 

1027 # or sooner if the latest half of our test effort has been fruitless. 

1028 return self.call_count < MIN_TEST_CALLS or self.call_count < min( 

1029 self.first_bug_found_at + 1000, self.last_bug_found_at * 2 

1030 ) 

1031 

1032 def generate_new_examples(self) -> None: 

1033 if Phase.generate not in self.settings.phases: 

1034 return 

1035 if self.interesting_examples: 

1036 # The example database has failing examples from a previous run, 

1037 # so we'd rather report that they're still failing ASAP than take 

1038 # the time to look for additional failures. 

1039 return 

1040 

1041 self.debug("Generating new examples") 

1042 

1043 assert self.should_generate_more() 

1044 self._switch_to_hypothesis_provider = True 

1045 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),)) 

1046 if zero_data.status > Status.OVERRUN: 

1047 assert isinstance(zero_data, ConjectureResult) 

1048 # if the crosshair backend cannot proceed, it does not (and cannot) 

1049 # realize the symbolic values, with the intent that Hypothesis will 

1050 # throw away this test case. We usually do, but if it's the zero data 

1051 # then we try to pin it here, which requires realizing the symbolics. 

1052 # 

1053 # We don't (yet) rely on the zero data being pinned, and so 

1054 # it's simply a very slight performance loss to simply not pin it 

1055 # if doing so would error. 

1056 if zero_data.cannot_proceed_scope is None: # pragma: no branch 

1057 self.__data_cache.pin( 

1058 self._cache_key(zero_data.choices), zero_data.as_result() 

1059 ) # Pin forever 

1060 

1061 if zero_data.status == Status.OVERRUN or ( 

1062 zero_data.status == Status.VALID 

1063 and isinstance(zero_data, ConjectureResult) 

1064 and zero_data.length * 2 > BUFFER_SIZE 

1065 ): 

1066 fail_health_check( 

1067 self.settings, 

1068 "The smallest natural example for your test is extremely " 

1069 "large. This makes it difficult for Hypothesis to generate " 

1070 "good examples, especially when trying to reduce failing ones " 

1071 "at the end. Consider reducing the size of your data if it is " 

1072 "of a fixed size. You could also fix this by improving how " 

1073 "your data shrinks (see https://hypothesis.readthedocs.io/en/" 

1074 "latest/data.html#shrinking for details), or by introducing " 

1075 "default values inside your strategy. e.g. could you replace " 

1076 "some arguments with their defaults by using " 

1077 "one_of(none(), some_complex_strategy)?", 

1078 HealthCheck.large_base_example, 

1079 ) 

1080 

1081 self.health_check_state = HealthCheckState() 

1082 

1083 # We attempt to use the size of the minimal generated test case starting 

1084 # from a given novel prefix as a guideline to generate smaller test 

1085 # cases for an initial period, by restriscting ourselves to test cases 

1086 # that are not much larger than it. 

1087 # 

1088 # Calculating the actual minimal generated test case is hard, so we 

1089 # take a best guess that zero extending a prefix produces the minimal 

1090 # test case starting with that prefix (this is true for our built in 

1091 # strategies). This is only a reasonable thing to do if the resulting 

1092 # test case is valid. If we regularly run into situations where it is 

1093 # not valid then this strategy is a waste of time, so we want to 

1094 # abandon it early. In order to do this we track how many times in a 

1095 # row it has failed to work, and abort small test case generation when 

1096 # it has failed too many times in a row. 

1097 consecutive_zero_extend_is_invalid = 0 

1098 

1099 # We control growth during initial example generation, for two 

1100 # reasons: 

1101 # 

1102 # * It gives us an opportunity to find small examples early, which 

1103 # gives us a fast path for easy to find bugs. 

1104 # * It avoids low probability events where we might end up 

1105 # generating very large examples during health checks, which 

1106 # on slower machines can trigger HealthCheck.too_slow. 

1107 # 

1108 # The heuristic we use is that we attempt to estimate the smallest 

1109 # extension of this prefix, and limit the size to no more than 

1110 # an order of magnitude larger than that. If we fail to estimate 

1111 # the size accurately, we skip over this prefix and try again. 

1112 # 

1113 # We need to tune the example size based on the initial prefix, 

1114 # because any fixed size might be too small, and any size based 

1115 # on the strategy in general can fall afoul of strategies that 

1116 # have very different sizes for different prefixes. 

1117 # 

1118 # We previously set a minimum value of 10 on small_example_cap, with the 

1119 # reasoning of avoiding flaky health checks. However, some users set a 

1120 # low max_examples for performance. A hard lower bound in this case biases 

1121 # the distribution towards small (and less powerful) examples. Flaky 

1122 # and loud health checks are better than silent performance degradation. 

1123 small_example_cap = min(self.settings.max_examples // 10, 50) 

1124 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10) 

1125 ran_optimisations = False 

1126 self._switch_to_hypothesis_provider = False 

1127 

1128 while self.should_generate_more(): 

1129 # we don't yet integrate DataTree with backends. Instead of generating 

1130 # a novel prefix, ask the backend for an input. 

1131 if not self.using_hypothesis_backend: 

1132 data = self.new_conjecture_data([]) 

1133 with suppress(BackendCannotProceed): 

1134 self.test_function(data) 

1135 continue 

1136 

1137 self._current_phase = "generate" 

1138 prefix = self.generate_novel_prefix() 

1139 if ( 

1140 self.valid_examples <= small_example_cap 

1141 and self.call_count <= 5 * small_example_cap 

1142 and not self.interesting_examples 

1143 and consecutive_zero_extend_is_invalid < 5 

1144 ): 

1145 minimal_example = self.cached_test_function( 

1146 prefix + (ChoiceTemplate("simplest", count=None),) 

1147 ) 

1148 

1149 if minimal_example.status < Status.VALID: 

1150 consecutive_zero_extend_is_invalid += 1 

1151 continue 

1152 # Because the Status code is greater than Status.VALID, it cannot be 

1153 # Status.OVERRUN, which guarantees that the minimal_example is a 

1154 # ConjectureResult object. 

1155 assert isinstance(minimal_example, ConjectureResult) 

1156 consecutive_zero_extend_is_invalid = 0 

1157 minimal_extension = len(minimal_example.choices) - len(prefix) 

1158 max_length = len(prefix) + minimal_extension * 5 

1159 

1160 # We could end up in a situation where even though the prefix was 

1161 # novel when we generated it, because we've now tried zero extending 

1162 # it not all possible continuations of it will be novel. In order to 

1163 # avoid making redundant test calls, we rerun it in simulation mode 

1164 # first. If this has a predictable result, then we don't bother 

1165 # running the test function for real here. If however we encounter 

1166 # some novel behaviour, we try again with the real test function, 

1167 # starting from the new novel prefix that has discovered. 

1168 trial_data = self.new_conjecture_data(prefix, max_choices=max_length) 

1169 try: 

1170 self.tree.simulate_test_function(trial_data) 

1171 continue 

1172 except PreviouslyUnseenBehaviour: 

1173 pass 

1174 

1175 # If the simulation entered part of the tree that has been killed, 

1176 # we don't want to run this. 

1177 assert isinstance(trial_data.observer, TreeRecordingObserver) 

1178 if trial_data.observer.killed: 

1179 continue 

1180 

1181 # We might have hit the cap on number of examples we should 

1182 # run when calculating the minimal example. 

1183 if not self.should_generate_more(): 

1184 break 

1185 

1186 prefix = trial_data.choices 

1187 else: 

1188 max_length = None 

1189 

1190 data = self.new_conjecture_data(prefix, max_choices=max_length) 

1191 self.test_function(data) 

1192 

1193 if ( 

1194 data.status is Status.OVERRUN 

1195 and max_length is not None 

1196 and "invalid because" not in data.events 

1197 ): 

1198 data.events["invalid because"] = ( 

1199 "reduced max size for early examples (avoids flaky health checks)" 

1200 ) 

1201 

1202 self.generate_mutations_from(data) 

1203 

1204 # Although the optimisations are logically a distinct phase, we 

1205 # actually normally run them as part of example generation. The 

1206 # reason for this is that we cannot guarantee that optimisation 

1207 # actually exhausts our budget: It might finish running and we 

1208 # discover that actually we still could run a bunch more test cases 

1209 # if we want. 

1210 if ( 

1211 self.valid_examples >= max(small_example_cap, optimise_at) 

1212 and not ran_optimisations 

1213 ): 

1214 ran_optimisations = True 

1215 self._current_phase = "target" 

1216 self.optimise_targets() 

1217 

1218 def generate_mutations_from( 

1219 self, data: Union[ConjectureData, ConjectureResult] 

1220 ) -> None: 

1221 # A thing that is often useful but rarely happens by accident is 

1222 # to generate the same value at multiple different points in the 

1223 # test case. 

1224 # 

1225 # Rather than make this the responsibility of individual strategies 

1226 # we implement a small mutator that just takes parts of the test 

1227 # case with the same label and tries replacing one of them with a 

1228 # copy of the other and tries running it. If we've made a good 

1229 # guess about what to put where, this will run a similar generated 

1230 # test case with more duplication. 

1231 if ( 

1232 # An OVERRUN doesn't have enough information about the test 

1233 # case to mutate, so we just skip those. 

1234 data.status >= Status.INVALID 

1235 # This has a tendency to trigger some weird edge cases during 

1236 # generation so we don't let it run until we're done with the 

1237 # health checks. 

1238 and self.health_check_state is None 

1239 ): 

1240 initial_calls = self.call_count 

1241 failed_mutations = 0 

1242 

1243 while ( 

1244 self.should_generate_more() 

1245 # We implement fairly conservative checks for how long we 

1246 # we should run mutation for, as it's generally not obvious 

1247 # how helpful it is for any given test case. 

1248 and self.call_count <= initial_calls + 5 

1249 and failed_mutations <= 5 

1250 ): 

1251 groups = data.spans.mutator_groups 

1252 if not groups: 

1253 break 

1254 

1255 group = self.random.choice(groups) 

1256 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2) 

1257 if start1 > start2: 

1258 (start1, end1), (start2, end2) = (start2, end2), (start1, end1) 

1259 

1260 if ( 

1261 start1 <= start2 <= end2 <= end1 

1262 ): # pragma: no cover # flaky on conjecture-cover tests 

1263 # One span entirely contains the other. The strategy is very 

1264 # likely some kind of tree. e.g. we might have 

1265 # 

1266 # ┌─────┐ 

1267 # ┌─────┤ a ├──────┐ 

1268 # │ └─────┘ │ 

1269 # ┌──┴──┐ ┌──┴──┐ 

1270 # ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1271 # │ └──┬──┘ │ │ └──┬──┘ │ 

1272 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1273 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1274 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1275 # 

1276 # where each node is drawn from the same strategy and so 

1277 # has the same span label. We might have selected the spans 

1278 # corresponding to the a and c nodes, which is the entire 

1279 # tree and the subtree of (and including) c respectively. 

1280 # 

1281 # There are two possible mutations we could apply in this case: 

1282 # 1. replace a with c (replace child with parent) 

1283 # 2. replace c with a (replace parent with child) 

1284 # 

1285 # (1) results in multiple partial copies of the 

1286 # parent: 

1287 # ┌─────┐ 

1288 # ┌─────┤ a ├────────────┐ 

1289 # │ └─────┘ │ 

1290 # ┌──┴──┐ ┌─┴───┐ 

1291 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐ 

1292 # │ └──┬──┘ │ │ └─────┘ │ 

1293 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐ 

1294 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1295 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │ 

1296 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1297 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1298 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1299 # 

1300 # While (2) results in truncating part of the parent: 

1301 # 

1302 # ┌─────┐ 

1303 # ┌──┤ c ├──┐ 

1304 # │ └──┬──┘ │ 

1305 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1306 # │ g │ │ h │ │ i │ 

1307 # └───┘ └───┘ └───┘ 

1308 # 

1309 # (1) is the same as Example IV.4. in Nautilus (NDSS '19) 

1310 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf), 

1311 # except we do not repeat the replacement additional times 

1312 # (the paper repeats it once for a total of two copies). 

1313 # 

1314 # We currently only apply mutation (1), and ignore mutation 

1315 # (2). The reason is that the attempt generated from (2) is 

1316 # always something that Hypothesis could easily have generated 

1317 # itself, by simply not making various choices. Whereas 

1318 # duplicating the exact value + structure of particular choices 

1319 # in (1) would have been hard for Hypothesis to generate by 

1320 # chance. 

1321 # 

1322 # TODO: an extension of this mutation might repeat (1) on 

1323 # a geometric distribution between 0 and ~10 times. We would 

1324 # need to find the corresponding span to recurse on in the new 

1325 # choices, probably just by using the choices index. 

1326 

1327 # case (1): duplicate the choices in start1:start2. 

1328 attempt = data.choices[:start2] + data.choices[start1:] 

1329 else: 

1330 (start, end) = self.random.choice([(start1, end1), (start2, end2)]) 

1331 replacement = data.choices[start:end] 

1332 # We attempt to replace both the examples with 

1333 # whichever choice we made. Note that this might end 

1334 # up messing up and getting the example boundaries 

1335 # wrong - labels matching are only a best guess as to 

1336 # whether the two are equivalent - but it doesn't 

1337 # really matter. It may not achieve the desired result, 

1338 # but it's still a perfectly acceptable choice sequence 

1339 # to try. 

1340 attempt = ( 

1341 data.choices[:start1] 

1342 + replacement 

1343 + data.choices[end1:start2] 

1344 + replacement 

1345 + data.choices[end2:] 

1346 ) 

1347 

1348 try: 

1349 new_data = self.cached_test_function( 

1350 attempt, 

1351 # We set error_on_discard so that we don't end up 

1352 # entering parts of the tree we consider redundant 

1353 # and not worth exploring. 

1354 error_on_discard=True, 

1355 ) 

1356 except ContainsDiscard: 

1357 failed_mutations += 1 

1358 continue 

1359 

1360 if new_data is Overrun: 

1361 failed_mutations += 1 # pragma: no cover # annoying case 

1362 else: 

1363 assert isinstance(new_data, ConjectureResult) 

1364 if ( 

1365 new_data.status >= data.status 

1366 and choices_key(data.choices) != choices_key(new_data.choices) 

1367 and all( 

1368 k in new_data.target_observations 

1369 and new_data.target_observations[k] >= v 

1370 for k, v in data.target_observations.items() 

1371 ) 

1372 ): 

1373 data = new_data 

1374 failed_mutations = 0 

1375 else: 

1376 failed_mutations += 1 

1377 

1378 def optimise_targets(self) -> None: 

1379 """If any target observations have been made, attempt to optimise them 

1380 all.""" 

1381 if not self.should_optimise: 

1382 return 

1383 from hypothesis.internal.conjecture.optimiser import Optimiser 

1384 

1385 # We want to avoid running the optimiser for too long in case we hit 

1386 # an unbounded target score. We start this off fairly conservatively 

1387 # in case interesting examples are easy to find and then ramp it up 

1388 # on an exponential schedule so we don't hamper the optimiser too much 

1389 # if it needs a long time to find good enough improvements. 

1390 max_improvements = 10 

1391 while True: 

1392 prev_calls = self.call_count 

1393 

1394 any_improvements = False 

1395 

1396 for target, data in list(self.best_examples_of_observed_targets.items()): 

1397 optimiser = Optimiser( 

1398 self, data, target, max_improvements=max_improvements 

1399 ) 

1400 optimiser.run() 

1401 if optimiser.improvements > 0: 

1402 any_improvements = True 

1403 

1404 if self.interesting_examples: 

1405 break 

1406 

1407 max_improvements *= 2 

1408 

1409 if any_improvements: 

1410 continue 

1411 

1412 if self.best_observed_targets: 

1413 self.pareto_optimise() 

1414 

1415 if prev_calls == self.call_count: 

1416 break 

1417 

1418 def pareto_optimise(self) -> None: 

1419 if self.pareto_front is not None: 

1420 ParetoOptimiser(self).run() 

1421 

1422 def _run(self) -> None: 

1423 # have to use the primitive provider to interpret database bits... 

1424 self._switch_to_hypothesis_provider = True 

1425 with self._log_phase_statistics("reuse"): 

1426 self.reuse_existing_examples() 

1427 # Fast path for development: If the database gave us interesting 

1428 # examples from the previously stored primary key, don't try 

1429 # shrinking it again as it's unlikely to work. 

1430 if self.reused_previously_shrunk_test_case: 

1431 self.exit_with(ExitReason.finished) 

1432 # ...but we should use the supplied provider when generating... 

1433 self._switch_to_hypothesis_provider = False 

1434 with self._log_phase_statistics("generate"): 

1435 self.generate_new_examples() 

1436 # We normally run the targeting phase mixed in with the generate phase, 

1437 # but if we've been asked to run it but not generation then we have to 

1438 # run it explicitly on its own here. 

1439 if Phase.generate not in self.settings.phases: 

1440 self._current_phase = "target" 

1441 self.optimise_targets() 

1442 # ...and back to the primitive provider when shrinking. 

1443 self._switch_to_hypothesis_provider = True 

1444 with self._log_phase_statistics("shrink"): 

1445 self.shrink_interesting_examples() 

1446 self.exit_with(ExitReason.finished) 

1447 

1448 def new_conjecture_data( 

1449 self, 

1450 prefix: Sequence[Union[ChoiceT, ChoiceTemplate]], 

1451 *, 

1452 observer: Optional[DataObserver] = None, 

1453 max_choices: Optional[int] = None, 

1454 ) -> ConjectureData: 

1455 provider = ( 

1456 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider 

1457 ) 

1458 observer = observer or self.tree.new_observer() 

1459 if not self.using_hypothesis_backend: 

1460 observer = DataObserver() 

1461 

1462 return ConjectureData( 

1463 prefix=prefix, 

1464 observer=observer, 

1465 provider=provider, 

1466 max_choices=max_choices, 

1467 random=self.random, 

1468 ) 

1469 

1470 def shrink_interesting_examples(self) -> None: 

1471 """If we've found interesting examples, try to replace each of them 

1472 with a minimal interesting example with the same interesting_origin. 

1473 

1474 We may find one or more examples with a new interesting_origin 

1475 during the shrink process. If so we shrink these too. 

1476 """ 

1477 if Phase.shrink not in self.settings.phases or not self.interesting_examples: 

1478 return 

1479 

1480 self.debug("Shrinking interesting examples") 

1481 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS 

1482 

1483 for prev_data in sorted( 

1484 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes) 

1485 ): 

1486 assert prev_data.status == Status.INTERESTING 

1487 data = self.new_conjecture_data(prev_data.choices) 

1488 self.test_function(data) 

1489 if data.status != Status.INTERESTING: 

1490 self.exit_with(ExitReason.flaky) 

1491 

1492 self.clear_secondary_key() 

1493 

1494 while len(self.shrunk_examples) < len(self.interesting_examples): 

1495 target, example = min( 

1496 ( 

1497 (k, v) 

1498 for k, v in self.interesting_examples.items() 

1499 if k not in self.shrunk_examples 

1500 ), 

1501 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))), 

1502 ) 

1503 self.debug(f"Shrinking {target!r}: {example.choices}") 

1504 

1505 if not self.settings.report_multiple_bugs: 

1506 # If multi-bug reporting is disabled, we shrink our currently-minimal 

1507 # failure, allowing 'slips' to any bug with a smaller minimal example. 

1508 self.shrink(example, lambda d: d.status == Status.INTERESTING) 

1509 return 

1510 

1511 def predicate(d: Union[ConjectureResult, _Overrun]) -> bool: 

1512 if d.status < Status.INTERESTING: 

1513 return False 

1514 d = cast(ConjectureResult, d) 

1515 return d.interesting_origin == target 

1516 

1517 self.shrink(example, predicate) 

1518 

1519 self.shrunk_examples.add(target) 

1520 

1521 def clear_secondary_key(self) -> None: 

1522 if self.has_existing_examples(): 

1523 # If we have any smaller examples in the secondary corpus, now is 

1524 # a good time to try them to see if they work as shrinks. They 

1525 # probably won't, but it's worth a shot and gives us a good 

1526 # opportunity to clear out the database. 

1527 

1528 # It's not worth trying the primary corpus because we already 

1529 # tried all of those in the initial phase. 

1530 corpus = sorted( 

1531 self.settings.database.fetch(self.secondary_key), key=shortlex 

1532 ) 

1533 for c in corpus: 

1534 choices = choices_from_bytes(c) 

1535 if choices is None: 

1536 self.settings.database.delete(self.secondary_key, c) 

1537 continue 

1538 primary = { 

1539 choices_to_bytes(v.choices) 

1540 for v in self.interesting_examples.values() 

1541 } 

1542 if shortlex(c) > max(map(shortlex, primary)): 

1543 break 

1544 

1545 self.cached_test_function(choices) 

1546 # We unconditionally remove c from the secondary key as it 

1547 # is either now primary or worse than our primary example 

1548 # of this reason for interestingness. 

1549 self.settings.database.delete(self.secondary_key, c) 

1550 

1551 def shrink( 

1552 self, 

1553 example: Union[ConjectureData, ConjectureResult], 

1554 predicate: Optional[ShrinkPredicateT] = None, 

1555 allow_transition: Optional[ 

1556 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1557 ] = None, 

1558 ) -> Union[ConjectureData, ConjectureResult]: 

1559 s = self.new_shrinker(example, predicate, allow_transition) 

1560 s.shrink() 

1561 return s.shrink_target 

1562 

1563 def new_shrinker( 

1564 self, 

1565 example: Union[ConjectureData, ConjectureResult], 

1566 predicate: Optional[ShrinkPredicateT] = None, 

1567 allow_transition: Optional[ 

1568 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1569 ] = None, 

1570 ) -> Shrinker: 

1571 return Shrinker( 

1572 self, 

1573 example, 

1574 predicate, 

1575 allow_transition=allow_transition, 

1576 explain=Phase.explain in self.settings.phases, 

1577 in_target_phase=self._current_phase == "target", 

1578 ) 

1579 

1580 def passing_choice_sequences( 

1581 self, prefix: Sequence[ChoiceNode] = () 

1582 ) -> frozenset[tuple[ChoiceNode, ...]]: 

1583 """Return a collection of choice sequence nodes which cause the test to pass. 

1584 Optionally restrict this by a certain prefix, which is useful for explain mode. 

1585 """ 

1586 return frozenset( 

1587 cast(ConjectureResult, result).nodes 

1588 for key in self.__data_cache 

1589 if (result := self.__data_cache[key]).status is Status.VALID 

1590 and startswith(cast(ConjectureResult, result).nodes, prefix) 

1591 ) 

1592 

1593 

1594class ContainsDiscard(Exception): 

1595 pass