Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/engine.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

702 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import importlib 

12import inspect 

13import math 

14import threading 

15import time 

16from collections import defaultdict 

17from collections.abc import Generator, Sequence 

18from contextlib import AbstractContextManager, contextmanager, nullcontext, suppress 

19from dataclasses import dataclass, field 

20from datetime import timedelta 

21from enum import Enum 

22from random import Random, getrandbits 

23from typing import Callable, Final, Literal, NoReturn, Optional, Union, cast 

24 

25from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings 

26from hypothesis._settings import local_settings, note_deprecation 

27from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes 

28from hypothesis.errors import ( 

29 BackendCannotProceed, 

30 FlakyBackendFailure, 

31 HypothesisException, 

32 InvalidArgument, 

33 StopTest, 

34) 

35from hypothesis.internal.cache import LRUReusedCache 

36from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override 

37from hypothesis.internal.conjecture.choice import ( 

38 ChoiceConstraintsT, 

39 ChoiceKeyT, 

40 ChoiceNode, 

41 ChoiceT, 

42 ChoiceTemplate, 

43 choices_key, 

44) 

45from hypothesis.internal.conjecture.data import ( 

46 ConjectureData, 

47 ConjectureResult, 

48 DataObserver, 

49 Overrun, 

50 Status, 

51 _Overrun, 

52) 

53from hypothesis.internal.conjecture.datatree import ( 

54 DataTree, 

55 PreviouslyUnseenBehaviour, 

56 TreeRecordingObserver, 

57) 

58from hypothesis.internal.conjecture.junkdrawer import ( 

59 ensure_free_stackframes, 

60 startswith, 

61) 

62from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser 

63from hypothesis.internal.conjecture.providers import ( 

64 AVAILABLE_PROVIDERS, 

65 HypothesisProvider, 

66 PrimitiveProvider, 

67) 

68from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key 

69from hypothesis.internal.escalation import InterestingOrigin 

70from hypothesis.internal.healthcheck import fail_health_check 

71from hypothesis.internal.observability import Observation, with_observability_callback 

72from hypothesis.reporting import base_report, report 

73 

74#: The maximum number of times the shrinker will reduce the complexity of a failing 

75#: input before giving up. This avoids falling down a trap of exponential (or worse) 

76#: complexity, where the shrinker appears to be making progress but will take a 

77#: substantially long time to finish completely. 

78MAX_SHRINKS: Final[int] = 500 

79 

80# If the shrinking phase takes more than five minutes, abort it early and print 

81# a warning. Many CI systems will kill a build after around ten minutes with 

82# no output, and appearing to hang isn't great for interactive use either - 

83# showing partially-shrunk examples is better than quitting with no examples! 

84# (but make it monkeypatchable, for the rare users who need to keep on shrinking) 

85 

86#: The maximum total time in seconds that the shrinker will try to shrink a failure 

87#: for before giving up. This is across all shrinks for the same failure, so even 

88#: if the shrinker successfully reduces the complexity of a single failure several 

89#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken. 

90MAX_SHRINKING_SECONDS: Final[int] = 300 

91 

92#: The maximum amount of entropy a single test case can use before giving up 

93#: while making random choices during input generation. 

94#: 

95#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you 

96#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly 

97#: increase the amount of entropy a test case can use during generation. 

98BUFFER_SIZE: Final[int] = 8 * 1024 

99CACHE_SIZE: Final[int] = 10000 

100MIN_TEST_CALLS: Final[int] = 10 

101 

102 

103def shortlex(s): 

104 return (len(s), s) 

105 

106 

107@dataclass 

108class HealthCheckState: 

109 valid_examples: int = field(default=0) 

110 invalid_examples: int = field(default=0) 

111 overrun_examples: int = field(default=0) 

112 draw_times: defaultdict[str, list[float]] = field( 

113 default_factory=lambda: defaultdict(list) 

114 ) 

115 

116 @property 

117 def total_draw_time(self) -> float: 

118 return math.fsum(sum(self.draw_times.values(), start=[])) 

119 

120 def timing_report(self) -> str: 

121 """Return a terminal report describing what was slow.""" 

122 if not self.draw_times: 

123 return "" 

124 width = max( 

125 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times 

126 ) 

127 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"] 

128 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1])) 

129 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch 

130 # If we have very many unique keys, which can happen due to interactive 

131 # draws with computed labels, we'll skip uninformative rows. 

132 if ( 

133 5 <= i < (len(self.draw_times) - 2) 

134 and math.fsum(times) * 20 < self.total_draw_time 

135 ): 

136 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)") 

137 break 

138 # Compute the row to report, omitting times <1ms to focus on slow draws 

139 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4] 

140 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",") 

141 arg = argname.removeprefix("generate:").removesuffix(": ") 

142 out.append( 

143 f" {arg:^{width}} | {len(times):>4} | " 

144 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}" 

145 ) 

146 return "\n".join(out) 

147 

148 

149class ExitReason(Enum): 

150 max_examples = "settings.max_examples={s.max_examples}" 

151 max_iterations = ( 

152 "settings.max_examples={s.max_examples}, " 

153 "but < 10% of examples satisfied assumptions" 

154 ) 

155 max_shrinks = f"shrunk example {MAX_SHRINKS} times" 

156 finished = "nothing left to do" 

157 flaky = "test was flaky" 

158 very_slow_shrinking = "shrinking was very slow" 

159 

160 def describe(self, settings: Settings) -> str: 

161 return self.value.format(s=settings) 

162 

163 

164class RunIsComplete(Exception): 

165 pass 

166 

167 

168def _get_provider(backend: str) -> Union[type, PrimitiveProvider]: 

169 mname, cname = AVAILABLE_PROVIDERS[backend].rsplit(".", 1) 

170 provider_cls = getattr(importlib.import_module(mname), cname) 

171 if provider_cls.lifetime == "test_function": 

172 return provider_cls(None) 

173 elif provider_cls.lifetime == "test_case": 

174 return provider_cls 

175 else: 

176 raise InvalidArgument( 

177 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. " 

178 "Expected one of 'test_function', 'test_case'." 

179 ) 

180 

181 

182class CallStats(TypedDict): 

183 status: str 

184 runtime: float 

185 drawtime: float 

186 gctime: float 

187 events: list[str] 

188 

189 

190PhaseStatistics = TypedDict( 

191 "PhaseStatistics", 

192 { 

193 "duration-seconds": float, 

194 "test-cases": list[CallStats], 

195 "distinct-failures": int, 

196 "shrinks-successful": int, 

197 }, 

198) 

199StatisticsDict = TypedDict( 

200 "StatisticsDict", 

201 { 

202 "generate-phase": NotRequired[PhaseStatistics], 

203 "reuse-phase": NotRequired[PhaseStatistics], 

204 "shrink-phase": NotRequired[PhaseStatistics], 

205 "stopped-because": NotRequired[str], 

206 "targets": NotRequired[dict[str, float]], 

207 "nodeid": NotRequired[str], 

208 }, 

209) 

210 

211 

212def choice_count(choices: Sequence[Union[ChoiceT, ChoiceTemplate]]) -> Optional[int]: 

213 count = 0 

214 for choice in choices: 

215 if isinstance(choice, ChoiceTemplate): 

216 if choice.count is None: 

217 return None 

218 count += choice.count 

219 else: 

220 count += 1 

221 return count 

222 

223 

224class DiscardObserver(DataObserver): 

225 @override 

226 def kill_branch(self) -> NoReturn: 

227 raise ContainsDiscard 

228 

229 

230def realize_choices(data: ConjectureData, *, for_failure: bool) -> None: 

231 # backwards-compatibility with backends without for_failure, can remove 

232 # in a few months 

233 kwargs = {} 

234 if for_failure: 

235 if "for_failure" in inspect.signature(data.provider.realize).parameters: 

236 kwargs["for_failure"] = True 

237 else: 

238 note_deprecation( 

239 f"{type(data.provider).__qualname__}.realize does not have the " 

240 "for_failure parameter. This will be an error in future versions " 

241 "of Hypothesis. (If you installed this backend from a separate " 

242 "package, upgrading that package may help).", 

243 has_codemod=False, 

244 since="2025-05-07", 

245 ) 

246 

247 for node in data.nodes: 

248 value = data.provider.realize(node.value, **kwargs) 

249 expected_type = { 

250 "string": str, 

251 "float": float, 

252 "integer": int, 

253 "boolean": bool, 

254 "bytes": bytes, 

255 }[node.type] 

256 if type(value) is not expected_type: 

257 raise HypothesisException( 

258 f"expected {expected_type} from " 

259 f"{data.provider.realize.__qualname__}, got {type(value)}" 

260 ) 

261 

262 constraints = cast( 

263 ChoiceConstraintsT, 

264 { 

265 k: data.provider.realize(v, **kwargs) 

266 for k, v in node.constraints.items() 

267 }, 

268 ) 

269 node.value = value 

270 node.constraints = constraints 

271 

272 

273class ConjectureRunner: 

274 def __init__( 

275 self, 

276 test_function: Callable[[ConjectureData], None], 

277 *, 

278 settings: Optional[Settings] = None, 

279 random: Optional[Random] = None, 

280 database_key: Optional[bytes] = None, 

281 ignore_limits: bool = False, 

282 thread_overlap: Optional[dict[int, bool]] = None, 

283 ) -> None: 

284 self._test_function: Callable[[ConjectureData], None] = test_function 

285 self.settings: Settings = settings or Settings() 

286 self.shrinks: int = 0 

287 self.finish_shrinking_deadline: Optional[float] = None 

288 self.call_count: int = 0 

289 self.misaligned_count: int = 0 

290 self.valid_examples: int = 0 

291 self.invalid_examples: int = 0 

292 self.overrun_examples: int = 0 

293 self.random: Random = random or Random(getrandbits(128)) 

294 self.database_key: Optional[bytes] = database_key 

295 self.ignore_limits: bool = ignore_limits 

296 self.thread_overlap = {} if thread_overlap is None else thread_overlap 

297 

298 # Global dict of per-phase statistics, and a list of per-call stats 

299 # which transfer to the global dict at the end of each phase. 

300 self._current_phase: str = "(not a phase)" 

301 self.statistics: StatisticsDict = {} 

302 self.stats_per_test_case: list[CallStats] = [] 

303 

304 self.interesting_examples: dict[InterestingOrigin, ConjectureResult] = {} 

305 # We use call_count because there may be few possible valid_examples. 

306 self.first_bug_found_at: Optional[int] = None 

307 self.last_bug_found_at: Optional[int] = None 

308 

309 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests. 

310 self.shrunk_examples: set[Optional[InterestingOrigin]] = set() 

311 

312 self.health_check_state: Optional[HealthCheckState] = None 

313 

314 self.tree: DataTree = DataTree() 

315 

316 self.provider: Union[type, PrimitiveProvider] = _get_provider( 

317 self.settings.backend 

318 ) 

319 

320 self.best_observed_targets: defaultdict[str, float] = defaultdict( 

321 lambda: NO_SCORE 

322 ) 

323 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {} 

324 

325 # We keep the pareto front in the example database if we have one. This 

326 # is only marginally useful at present, but speeds up local development 

327 # because it means that large targets will be quickly surfaced in your 

328 # testing. 

329 self.pareto_front: Optional[ParetoFront] = None 

330 if self.database_key is not None and self.settings.database is not None: 

331 self.pareto_front = ParetoFront(self.random) 

332 self.pareto_front.on_evict(self.on_pareto_evict) 

333 

334 # We want to be able to get the ConjectureData object that results 

335 # from running a choice sequence without recalculating, especially during 

336 # shrinking where we need to know about the structure of the 

337 # executed test case. 

338 self.__data_cache = LRUReusedCache[ 

339 tuple[ChoiceKeyT, ...], Union[ConjectureResult, _Overrun] 

340 ](CACHE_SIZE) 

341 

342 self.reused_previously_shrunk_test_case: bool = False 

343 

344 self.__pending_call_explanation: Optional[str] = None 

345 self._backend_found_failure: bool = False 

346 self._backend_exceeded_deadline: bool = False 

347 self._switch_to_hypothesis_provider: bool = False 

348 

349 self.__failed_realize_count: int = 0 

350 # note unsound verification by alt backends 

351 self._verified_by: Optional[str] = None 

352 

353 @contextmanager 

354 def _with_switch_to_hypothesis_provider( 

355 self, value: bool 

356 ) -> Generator[None, None, None]: 

357 previous = self._switch_to_hypothesis_provider 

358 try: 

359 self._switch_to_hypothesis_provider = value 

360 yield 

361 finally: 

362 self._switch_to_hypothesis_provider = previous 

363 

364 @property 

365 def using_hypothesis_backend(self) -> bool: 

366 return ( 

367 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider 

368 ) 

369 

370 def explain_next_call_as(self, explanation: str) -> None: 

371 self.__pending_call_explanation = explanation 

372 

373 def clear_call_explanation(self) -> None: 

374 self.__pending_call_explanation = None 

375 

376 @contextmanager 

377 def _log_phase_statistics( 

378 self, phase: Literal["reuse", "generate", "shrink"] 

379 ) -> Generator[None, None, None]: 

380 self.stats_per_test_case.clear() 

381 start_time = time.perf_counter() 

382 try: 

383 self._current_phase = phase 

384 yield 

385 finally: 

386 # We ignore the mypy type error here. Because `phase` is a string literal and "-phase" is a string literal 

387 # as well, the concatenation will always be valid key in the dictionary. 

388 self.statistics[phase + "-phase"] = { # type: ignore 

389 "duration-seconds": time.perf_counter() - start_time, 

390 "test-cases": list(self.stats_per_test_case), 

391 "distinct-failures": len(self.interesting_examples), 

392 "shrinks-successful": self.shrinks, 

393 } 

394 

395 @property 

396 def should_optimise(self) -> bool: 

397 return Phase.target in self.settings.phases 

398 

399 def __tree_is_exhausted(self) -> bool: 

400 return self.tree.is_exhausted and self.using_hypothesis_backend 

401 

402 def __stoppable_test_function(self, data: ConjectureData) -> None: 

403 """Run ``self._test_function``, but convert a ``StopTest`` exception 

404 into a normal return and avoid raising anything flaky for RecursionErrors. 

405 """ 

406 # We ensure that the test has this much stack space remaining, no 

407 # matter the size of the stack when called, to de-flake RecursionErrors 

408 # (#2494, #3671). Note, this covers the data generation part of the test; 

409 # the actual test execution is additionally protected at the call site 

410 # in hypothesis.core.execute_once. 

411 with ensure_free_stackframes(): 

412 try: 

413 self._test_function(data) 

414 except StopTest as e: 

415 if e.testcounter == data.testcounter: 

416 # This StopTest has successfully stopped its test, and can now 

417 # be discarded. 

418 pass 

419 else: 

420 # This StopTest was raised by a different ConjectureData. We 

421 # need to re-raise it so that it will eventually reach the 

422 # correct engine. 

423 raise 

424 

425 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]: 

426 return choices_key(choices) 

427 

428 def _cache(self, data: ConjectureData) -> None: 

429 result = data.as_result() 

430 key = self._cache_key(data.choices) 

431 self.__data_cache[key] = result 

432 

433 def cached_test_function( 

434 self, 

435 choices: Sequence[Union[ChoiceT, ChoiceTemplate]], 

436 *, 

437 error_on_discard: bool = False, 

438 extend: Union[int, Literal["full"]] = 0, 

439 ) -> Union[ConjectureResult, _Overrun]: 

440 """ 

441 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard`` 

442 in preference to running the actual test function. This is to allow us 

443 to skip test cases we expect to be redundant in some cases. Note that 

444 it may be the case that we don't raise ``ContainsDiscard`` even if the 

445 result has discards if we cannot determine from previous runs whether 

446 it will have a discard. 

447 """ 

448 # node templates represent a not-yet-filled hole and therefore cannot 

449 # be cached or retrieved from the cache. 

450 if not any(isinstance(choice, ChoiceTemplate) for choice in choices): 

451 # this type cast is validated by the isinstance check above (ie, there 

452 # are no ChoiceTemplate elements). 

453 choices = cast(Sequence[ChoiceT], choices) 

454 key = self._cache_key(choices) 

455 try: 

456 cached = self.__data_cache[key] 

457 # if we have a cached overrun for this key, but we're allowing extensions 

458 # of the nodes, it could in fact run to a valid data if we try. 

459 if extend == 0 or cached.status is not Status.OVERRUN: 

460 return cached 

461 except KeyError: 

462 pass 

463 

464 if extend == "full": 

465 max_length = None 

466 elif (count := choice_count(choices)) is None: 

467 max_length = None 

468 else: 

469 max_length = count + extend 

470 

471 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver. 

472 # The reason is we don't expect simulate_test_function to explore new choices 

473 # and write back to the tree, so we don't want the overhead of the 

474 # TreeRecordingObserver tracking those calls. 

475 trial_observer: Optional[DataObserver] = DataObserver() 

476 if error_on_discard: 

477 trial_observer = DiscardObserver() 

478 

479 try: 

480 trial_data = self.new_conjecture_data( 

481 choices, observer=trial_observer, max_choices=max_length 

482 ) 

483 self.tree.simulate_test_function(trial_data) 

484 except PreviouslyUnseenBehaviour: 

485 pass 

486 else: 

487 trial_data.freeze() 

488 key = self._cache_key(trial_data.choices) 

489 if trial_data.status > Status.OVERRUN: 

490 try: 

491 return self.__data_cache[key] 

492 except KeyError: 

493 pass 

494 else: 

495 # if we simulated to an overrun, then we our result is certainly 

496 # an overrun; no need to consult the cache. (and we store this result 

497 # for simulation-less lookup later). 

498 self.__data_cache[key] = Overrun 

499 return Overrun 

500 try: 

501 return self.__data_cache[key] 

502 except KeyError: 

503 pass 

504 

505 data = self.new_conjecture_data(choices, max_choices=max_length) 

506 # note that calling test_function caches `data` for us. 

507 self.test_function(data) 

508 return data.as_result() 

509 

510 def test_function(self, data: ConjectureData) -> None: 

511 if self.__pending_call_explanation is not None: 

512 self.debug(self.__pending_call_explanation) 

513 self.__pending_call_explanation = None 

514 

515 self.call_count += 1 

516 interrupted = False 

517 

518 try: 

519 self.__stoppable_test_function(data) 

520 except KeyboardInterrupt: 

521 interrupted = True 

522 raise 

523 except BackendCannotProceed as exc: 

524 if exc.scope in ("verified", "exhausted"): 

525 self._switch_to_hypothesis_provider = True 

526 if exc.scope == "verified": 

527 self._verified_by = self.settings.backend 

528 elif exc.scope == "discard_test_case": 

529 self.__failed_realize_count += 1 

530 if ( 

531 self.__failed_realize_count > 10 

532 and (self.__failed_realize_count / self.call_count) > 0.2 

533 ): 

534 self._switch_to_hypothesis_provider = True 

535 

536 # treat all BackendCannotProceed exceptions as invalid. This isn't 

537 # great; "verified" should really be counted as self.valid_examples += 1. 

538 # But we check self.valid_examples == 0 to determine whether to raise 

539 # Unsatisfiable, and that would throw this check off. 

540 self.invalid_examples += 1 

541 

542 # skip the post-test-case tracking; we're pretending this never happened 

543 interrupted = True 

544 data.cannot_proceed_scope = exc.scope 

545 data.freeze() 

546 return 

547 except BaseException: 

548 data.freeze() 

549 if self.settings.backend != "hypothesis": 

550 realize_choices(data, for_failure=True) 

551 self.save_choices(data.choices) 

552 raise 

553 finally: 

554 # No branch, because if we're interrupted we always raise 

555 # the KeyboardInterrupt, never continue to the code below. 

556 if not interrupted: # pragma: no branch 

557 assert data.cannot_proceed_scope is None 

558 data.freeze() 

559 call_stats: CallStats = { 

560 "status": data.status.name.lower(), 

561 "runtime": data.finish_time - data.start_time, 

562 "drawtime": math.fsum(data.draw_times.values()), 

563 "gctime": data.gc_finish_time - data.gc_start_time, 

564 "events": sorted( 

565 k if v == "" else f"{k}: {v}" for k, v in data.events.items() 

566 ), 

567 } 

568 self.stats_per_test_case.append(call_stats) 

569 if self.settings.backend != "hypothesis": 

570 realize_choices(data, for_failure=data.status is Status.INTERESTING) 

571 

572 self._cache(data) 

573 if data.misaligned_at is not None: # pragma: no branch # coverage bug? 

574 self.misaligned_count += 1 

575 

576 self.debug_data(data) 

577 

578 if ( 

579 data.target_observations 

580 and self.pareto_front is not None 

581 and self.pareto_front.add(data.as_result()) 

582 ): 

583 self.save_choices(data.choices, sub_key=b"pareto") 

584 

585 if data.status >= Status.VALID: 

586 for k, v in data.target_observations.items(): 

587 self.best_observed_targets[k] = max(self.best_observed_targets[k], v) 

588 

589 if k not in self.best_examples_of_observed_targets: 

590 data_as_result = data.as_result() 

591 assert not isinstance(data_as_result, _Overrun) 

592 self.best_examples_of_observed_targets[k] = data_as_result 

593 continue 

594 

595 existing_example = self.best_examples_of_observed_targets[k] 

596 existing_score = existing_example.target_observations[k] 

597 

598 if v < existing_score: 

599 continue 

600 

601 if v > existing_score or sort_key(data.nodes) < sort_key( 

602 existing_example.nodes 

603 ): 

604 data_as_result = data.as_result() 

605 assert not isinstance(data_as_result, _Overrun) 

606 self.best_examples_of_observed_targets[k] = data_as_result 

607 

608 if data.status is Status.VALID: 

609 self.valid_examples += 1 

610 if data.status is Status.INVALID: 

611 self.invalid_examples += 1 

612 if data.status is Status.OVERRUN: 

613 self.overrun_examples += 1 

614 

615 if data.status == Status.INTERESTING: 

616 if not self.using_hypothesis_backend: 

617 # replay this failure on the hypothesis backend to ensure it still 

618 # finds a failure. otherwise, it is flaky. 

619 initial_exception = data.expected_exception 

620 data = ConjectureData.for_choices(data.choices) 

621 # we've already going to use the hypothesis provider for this 

622 # data, so the verb "switch" is a bit misleading here. We're really 

623 # setting this to inform our on_observation logic that the observation 

624 # generated here was from a hypothesis backend, and shouldn't be 

625 # sent to the on_observation of any alternative backend. 

626 with self._with_switch_to_hypothesis_provider(True): 

627 self.__stoppable_test_function(data) 

628 data.freeze() 

629 # TODO: Should same-origin also be checked? (discussion in 

630 # https://github.com/HypothesisWorks/hypothesis/pull/4470#discussion_r2217055487) 

631 if data.status != Status.INTERESTING: 

632 desc_new_status = { 

633 data.status.VALID: "passed", 

634 data.status.INVALID: "failed filters", 

635 data.status.OVERRUN: "overran", 

636 }[data.status] 

637 raise FlakyBackendFailure( 

638 f"Inconsistent results from replaying a failing test case! " 

639 f"Raised {type(initial_exception).__name__} on " 

640 f"backend={self.settings.backend!r}, but " 

641 f"{desc_new_status} under backend='hypothesis'.", 

642 [initial_exception], 

643 ) 

644 

645 self._cache(data) 

646 

647 assert data.interesting_origin is not None 

648 key = data.interesting_origin 

649 changed = False 

650 try: 

651 existing = self.interesting_examples[key] 

652 except KeyError: 

653 changed = True 

654 self.last_bug_found_at = self.call_count 

655 if self.first_bug_found_at is None: 

656 self.first_bug_found_at = self.call_count 

657 else: 

658 if sort_key(data.nodes) < sort_key(existing.nodes): 

659 self.shrinks += 1 

660 self.downgrade_choices(existing.choices) 

661 self.__data_cache.unpin(self._cache_key(existing.choices)) 

662 changed = True 

663 

664 if changed: 

665 self.save_choices(data.choices) 

666 self.interesting_examples[key] = data.as_result() # type: ignore 

667 if not self.using_hypothesis_backend: 

668 self._backend_found_failure = True 

669 self.__data_cache.pin(self._cache_key(data.choices), data.as_result()) 

670 self.shrunk_examples.discard(key) 

671 

672 if self.shrinks >= MAX_SHRINKS: 

673 self.exit_with(ExitReason.max_shrinks) 

674 

675 if ( 

676 not self.ignore_limits 

677 and self.finish_shrinking_deadline is not None 

678 and self.finish_shrinking_deadline < time.perf_counter() 

679 ): 

680 # See https://github.com/HypothesisWorks/hypothesis/issues/2340 

681 report( 

682 "WARNING: Hypothesis has spent more than five minutes working to shrink" 

683 " a failing example, and stopped because it is making very slow" 

684 " progress. When you re-run your tests, shrinking will resume and may" 

685 " take this long before aborting again.\nPLEASE REPORT THIS if you can" 

686 " provide a reproducing example, so that we can improve shrinking" 

687 " performance for everyone." 

688 ) 

689 self.exit_with(ExitReason.very_slow_shrinking) 

690 

691 if not self.interesting_examples: 

692 # Note that this logic is reproduced to end the generation phase when 

693 # we have interesting examples. Update that too if you change this! 

694 # (The doubled implementation is because here we exit the engine entirely, 

695 # while in the other case below we just want to move on to shrinking.) 

696 if self.valid_examples >= self.settings.max_examples: 

697 self.exit_with(ExitReason.max_examples) 

698 if self.call_count >= max( 

699 self.settings.max_examples * 10, 

700 # We have a high-ish default max iterations, so that tests 

701 # don't become flaky when max_examples is too low. 

702 1000, 

703 ): 

704 self.exit_with(ExitReason.max_iterations) 

705 

706 if self.__tree_is_exhausted(): 

707 self.exit_with(ExitReason.finished) 

708 

709 self.record_for_health_check(data) 

710 

711 def on_pareto_evict(self, data: ConjectureResult) -> None: 

712 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices)) 

713 

714 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]: 

715 """Uses the tree to proactively generate a starting choice sequence 

716 that we haven't explored yet for this test. 

717 

718 When this method is called, we assume that there must be at 

719 least one novel prefix left to find. If there were not, then the 

720 test run should have already stopped due to tree exhaustion. 

721 """ 

722 return self.tree.generate_novel_prefix(self.random) 

723 

724 def record_for_health_check(self, data: ConjectureData) -> None: 

725 # Once we've actually found a bug, there's no point in trying to run 

726 # health checks - they'll just mask the actually important information. 

727 if data.status == Status.INTERESTING: 

728 self.health_check_state = None 

729 

730 state = self.health_check_state 

731 

732 if state is None: 

733 return 

734 

735 for k, v in data.draw_times.items(): 

736 state.draw_times[k].append(v) 

737 

738 if data.status == Status.VALID: 

739 state.valid_examples += 1 

740 elif data.status == Status.INVALID: 

741 state.invalid_examples += 1 

742 else: 

743 assert data.status == Status.OVERRUN 

744 state.overrun_examples += 1 

745 

746 max_valid_draws = 10 

747 max_invalid_draws = 50 

748 max_overrun_draws = 20 

749 

750 assert state.valid_examples <= max_valid_draws 

751 

752 if state.valid_examples == max_valid_draws: 

753 self.health_check_state = None 

754 return 

755 

756 if state.overrun_examples == max_overrun_draws: 

757 fail_health_check( 

758 self.settings, 

759 "Generated inputs routinely consumed more than the maximum " 

760 f"allowed entropy: {state.valid_examples} inputs were generated " 

761 f"successfully, while {state.overrun_examples} inputs exceeded the " 

762 f"maximum allowed entropy during generation." 

763 "\n\n" 

764 f"Testing with inputs this large tends to be slow, and to produce " 

765 "failures that are both difficult to shrink and difficult to understand. " 

766 "Try decreasing the amount of data generated, for example by " 

767 "decreasing the minimum size of collection strategies like " 

768 "st.lists()." 

769 "\n\n" 

770 "If you expect the average size of your input to be this large, " 

771 "you can disable this health check with " 

772 "@settings(suppress_health_check=[HealthCheck.data_too_large]). " 

773 "See " 

774 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

775 "for details.", 

776 HealthCheck.data_too_large, 

777 ) 

778 if state.invalid_examples == max_invalid_draws: 

779 fail_health_check( 

780 self.settings, 

781 "It looks like this test is filtering out a lot of inputs. " 

782 f"{state.valid_examples} inputs were generated successfully, " 

783 f"while {state.invalid_examples} inputs were filtered out. " 

784 "\n\n" 

785 "An input might be filtered out by calls to assume(), " 

786 "strategy.filter(...), or occasionally by Hypothesis internals." 

787 "\n\n" 

788 "Applying this much filtering makes input generation slow, since " 

789 "Hypothesis must discard inputs which are filtered out and try " 

790 "generating it again. It is also possible that applying this much " 

791 "filtering will distort the domain and/or distribution of the test, " 

792 "leaving your testing less rigorous than expected." 

793 "\n\n" 

794 "If you expect this many inputs to be filtered out during generation, " 

795 "you can disable this health check with " 

796 "@settings(suppress_health_check=[HealthCheck.filter_too_much]). See " 

797 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

798 "for details.", 

799 HealthCheck.filter_too_much, 

800 ) 

801 

802 # Allow at least the greater of one second or 5x the deadline. If deadline 

803 # is None, allow 30s - the user can disable the healthcheck too if desired. 

804 draw_time = state.total_draw_time 

805 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6)) 

806 if ( 

807 draw_time > max(1.0, draw_time_limit.total_seconds()) 

808 # we disable HealthCheck.too_slow under concurrent threads, since 

809 # cpython may switch away from a thread for arbitrarily long. 

810 and not self.thread_overlap.get(threading.get_ident(), False) 

811 ): 

812 extra_str = [] 

813 if state.invalid_examples: 

814 extra_str.append(f"{state.invalid_examples} invalid inputs") 

815 if state.overrun_examples: 

816 extra_str.append( 

817 f"{state.overrun_examples} inputs which exceeded the " 

818 "maximum allowed entropy" 

819 ) 

820 extra_str = ", and ".join(extra_str) 

821 extra_str = f" ({extra_str})" if extra_str else "" 

822 

823 fail_health_check( 

824 self.settings, 

825 "Input generation is slow: Hypothesis only generated " 

826 f"{state.valid_examples} valid inputs after {draw_time:.2f} " 

827 f"seconds{extra_str}." 

828 "\n" + state.timing_report() + "\n\n" 

829 "This could be for a few reasons:" 

830 "\n" 

831 "1. This strategy could be generating too much data per input. " 

832 "Try decreasing the amount of data generated, for example by " 

833 "decreasing the minimum size of collection strategies like " 

834 "st.lists()." 

835 "\n" 

836 "2. Some other expensive computation could be running during input " 

837 "generation. For example, " 

838 "if @st.composite or st.data() is interspersed with an expensive " 

839 "computation, HealthCheck.too_slow is likely to trigger. If this " 

840 "computation is unrelated to input generation, move it elsewhere. " 

841 "Otherwise, try making it more efficient, or disable this health " 

842 "check if that is not possible." 

843 "\n\n" 

844 "If you expect input generation to take this long, you can disable " 

845 "this health check with " 

846 "@settings(suppress_health_check=[HealthCheck.too_slow]). See " 

847 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

848 "for details.", 

849 HealthCheck.too_slow, 

850 ) 

851 

852 def save_choices( 

853 self, choices: Sequence[ChoiceT], sub_key: Optional[bytes] = None 

854 ) -> None: 

855 if self.settings.database is not None: 

856 key = self.sub_key(sub_key) 

857 if key is None: 

858 return 

859 self.settings.database.save(key, choices_to_bytes(choices)) 

860 

861 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None: 

862 buffer = choices_to_bytes(choices) 

863 if self.settings.database is not None and self.database_key is not None: 

864 self.settings.database.move(self.database_key, self.secondary_key, buffer) 

865 

866 def sub_key(self, sub_key: Optional[bytes]) -> Optional[bytes]: 

867 if self.database_key is None: 

868 return None 

869 if sub_key is None: 

870 return self.database_key 

871 return b".".join((self.database_key, sub_key)) 

872 

873 @property 

874 def secondary_key(self) -> Optional[bytes]: 

875 return self.sub_key(b"secondary") 

876 

877 @property 

878 def pareto_key(self) -> Optional[bytes]: 

879 return self.sub_key(b"pareto") 

880 

881 def debug(self, message: str) -> None: 

882 if self.settings.verbosity >= Verbosity.debug: 

883 base_report(message) 

884 

885 @property 

886 def report_debug_info(self) -> bool: 

887 return self.settings.verbosity >= Verbosity.debug 

888 

889 def debug_data(self, data: Union[ConjectureData, ConjectureResult]) -> None: 

890 if not self.report_debug_info: 

891 return 

892 

893 status = repr(data.status) 

894 if data.status == Status.INTERESTING: 

895 status = f"{status} ({data.interesting_origin!r})" 

896 

897 self.debug( 

898 f"{len(data.choices)} choices {data.choices} -> {status}" 

899 f"{', ' + data.output if data.output else ''}" 

900 ) 

901 

902 def observe_for_provider(self) -> AbstractContextManager: 

903 def on_observation(observation: Observation) -> None: 

904 assert observation.type == "test_case" 

905 # because lifetime == "test_function" 

906 assert isinstance(self.provider, PrimitiveProvider) 

907 # only fire if we actually used that provider to generate this observation 

908 if not self._switch_to_hypothesis_provider: 

909 self.provider.on_observation(observation) 

910 

911 if ( 

912 self.settings.backend != "hypothesis" 

913 # only for lifetime = "test_function" providers (guaranteed 

914 # by this isinstance check) 

915 and isinstance(self.provider, PrimitiveProvider) 

916 # and the provider opted-in to observations 

917 and self.provider.add_observability_callback 

918 ): 

919 return with_observability_callback(on_observation) 

920 return nullcontext() 

921 

922 def run(self) -> None: 

923 with local_settings(self.settings): 

924 # NOTE: For compatibility with Python 3.9's LL(1) 

925 # parser, this is written as a nested with-statement, 

926 # instead of a compound one. 

927 with self.observe_for_provider(): 

928 try: 

929 self._run() 

930 except RunIsComplete: 

931 pass 

932 for v in self.interesting_examples.values(): 

933 self.debug_data(v) 

934 self.debug( 

935 "Run complete after %d examples (%d valid) and %d shrinks" 

936 % (self.call_count, self.valid_examples, self.shrinks) 

937 ) 

938 

939 @property 

940 def database(self) -> Optional[ExampleDatabase]: 

941 if self.database_key is None: 

942 return None 

943 return self.settings.database 

944 

945 def has_existing_examples(self) -> bool: 

946 return self.database is not None and Phase.reuse in self.settings.phases 

947 

948 def reuse_existing_examples(self) -> None: 

949 """If appropriate (we have a database and have been told to use it), 

950 try to reload existing examples from the database. 

951 

952 If there are a lot we don't try all of them. We always try the 

953 smallest example in the database (which is guaranteed to be the 

954 last failure) and the largest (which is usually the seed example 

955 which the last failure came from but we don't enforce that). We 

956 then take a random sampling of the remainder and try those. Any 

957 examples that are no longer interesting are cleared out. 

958 """ 

959 if self.has_existing_examples(): 

960 self.debug("Reusing examples from database") 

961 # We have to do some careful juggling here. We have two database 

962 # corpora: The primary and secondary. The primary corpus is a 

963 # small set of minimized examples each of which has at one point 

964 # demonstrated a distinct bug. We want to retry all of these. 

965 

966 # We also have a secondary corpus of examples that have at some 

967 # point demonstrated interestingness (currently only ones that 

968 # were previously non-minimal examples of a bug, but this will 

969 # likely expand in future). These are a good source of potentially 

970 # interesting examples, but there are a lot of them, so we down 

971 # sample the secondary corpus to a more manageable size. 

972 

973 corpus = sorted( 

974 self.settings.database.fetch(self.database_key), key=shortlex 

975 ) 

976 factor = 0.1 if (Phase.generate in self.settings.phases) else 1 

977 desired_size = max(2, ceil(factor * self.settings.max_examples)) 

978 primary_corpus_size = len(corpus) 

979 

980 if len(corpus) < desired_size: 

981 extra_corpus = list(self.settings.database.fetch(self.secondary_key)) 

982 

983 shortfall = desired_size - len(corpus) 

984 

985 if len(extra_corpus) <= shortfall: 

986 extra = extra_corpus 

987 else: 

988 extra = self.random.sample(extra_corpus, shortfall) 

989 extra.sort(key=shortlex) 

990 corpus.extend(extra) 

991 

992 # We want a fast path where every primary entry in the database was 

993 # interesting. 

994 found_interesting_in_primary = False 

995 all_interesting_in_primary_were_exact = True 

996 

997 for i, existing in enumerate(corpus): 

998 if i >= primary_corpus_size and found_interesting_in_primary: 

999 break 

1000 choices = choices_from_bytes(existing) 

1001 if choices is None: 

1002 # clear out any keys which fail deserialization 

1003 self.settings.database.delete(self.database_key, existing) 

1004 continue 

1005 data = self.cached_test_function(choices, extend="full") 

1006 if data.status != Status.INTERESTING: 

1007 self.settings.database.delete(self.database_key, existing) 

1008 self.settings.database.delete(self.secondary_key, existing) 

1009 else: 

1010 if i < primary_corpus_size: 

1011 found_interesting_in_primary = True 

1012 assert not isinstance(data, _Overrun) 

1013 if choices_key(choices) != choices_key(data.choices): 

1014 all_interesting_in_primary_were_exact = False 

1015 if not self.settings.report_multiple_bugs: 

1016 break 

1017 if found_interesting_in_primary: 

1018 if all_interesting_in_primary_were_exact: 

1019 self.reused_previously_shrunk_test_case = True 

1020 

1021 # Because self.database is not None (because self.has_existing_examples()) 

1022 # and self.database_key is not None (because we fetched using it above), 

1023 # we can guarantee self.pareto_front is not None 

1024 assert self.pareto_front is not None 

1025 

1026 # If we've not found any interesting examples so far we try some of 

1027 # the pareto front from the last run. 

1028 if len(corpus) < desired_size and not self.interesting_examples: 

1029 desired_extra = desired_size - len(corpus) 

1030 pareto_corpus = list(self.settings.database.fetch(self.pareto_key)) 

1031 if len(pareto_corpus) > desired_extra: 

1032 pareto_corpus = self.random.sample(pareto_corpus, desired_extra) 

1033 pareto_corpus.sort(key=shortlex) 

1034 

1035 for existing in pareto_corpus: 

1036 choices = choices_from_bytes(existing) 

1037 if choices is None: 

1038 self.settings.database.delete(self.pareto_key, existing) 

1039 continue 

1040 data = self.cached_test_function(choices, extend="full") 

1041 if data not in self.pareto_front: 

1042 self.settings.database.delete(self.pareto_key, existing) 

1043 if data.status == Status.INTERESTING: 

1044 break 

1045 

1046 def exit_with(self, reason: ExitReason) -> None: 

1047 if self.ignore_limits: 

1048 return 

1049 self.statistics["stopped-because"] = reason.describe(self.settings) 

1050 if self.best_observed_targets: 

1051 self.statistics["targets"] = dict(self.best_observed_targets) 

1052 self.debug(f"exit_with({reason.name})") 

1053 self.exit_reason = reason 

1054 raise RunIsComplete 

1055 

1056 def should_generate_more(self) -> bool: 

1057 # End the generation phase where we would have ended it if no bugs had 

1058 # been found. This reproduces the exit logic in `self.test_function`, 

1059 # but with the important distinction that this clause will move on to 

1060 # the shrinking phase having found one or more bugs, while the other 

1061 # will exit having found zero bugs. 

1062 if self.valid_examples >= self.settings.max_examples or self.call_count >= max( 

1063 self.settings.max_examples * 10, 1000 

1064 ): # pragma: no cover 

1065 return False 

1066 

1067 # If we haven't found a bug, keep looking - if we hit any limits on 

1068 # the number of tests to run that will raise an exception and stop 

1069 # the run. 

1070 if not self.interesting_examples: 

1071 return True 

1072 # Users who disable shrinking probably want to exit as fast as possible. 

1073 # If we've found a bug and won't report more than one, stop looking. 

1074 elif ( 

1075 Phase.shrink not in self.settings.phases 

1076 or not self.settings.report_multiple_bugs 

1077 ): 

1078 return False 

1079 assert isinstance(self.first_bug_found_at, int) 

1080 assert isinstance(self.last_bug_found_at, int) 

1081 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count 

1082 # Otherwise, keep searching for between ten and 'a heuristic' calls. 

1083 # We cap 'calls after first bug' so errors are reported reasonably 

1084 # soon even for tests that are allowed to run for a very long time, 

1085 # or sooner if the latest half of our test effort has been fruitless. 

1086 return self.call_count < MIN_TEST_CALLS or self.call_count < min( 

1087 self.first_bug_found_at + 1000, self.last_bug_found_at * 2 

1088 ) 

1089 

1090 def generate_new_examples(self) -> None: 

1091 if Phase.generate not in self.settings.phases: 

1092 return 

1093 if self.interesting_examples: 

1094 # The example database has failing examples from a previous run, 

1095 # so we'd rather report that they're still failing ASAP than take 

1096 # the time to look for additional failures. 

1097 return 

1098 

1099 self.debug("Generating new examples") 

1100 

1101 assert self.should_generate_more() 

1102 self._switch_to_hypothesis_provider = True 

1103 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),)) 

1104 if zero_data.status > Status.OVERRUN: 

1105 assert isinstance(zero_data, ConjectureResult) 

1106 # if the crosshair backend cannot proceed, it does not (and cannot) 

1107 # realize the symbolic values, with the intent that Hypothesis will 

1108 # throw away this test case. We usually do, but if it's the zero data 

1109 # then we try to pin it here, which requires realizing the symbolics. 

1110 # 

1111 # We don't (yet) rely on the zero data being pinned, and so 

1112 # it's simply a very slight performance loss to simply not pin it 

1113 # if doing so would error. 

1114 if zero_data.cannot_proceed_scope is None: # pragma: no branch 

1115 self.__data_cache.pin( 

1116 self._cache_key(zero_data.choices), zero_data.as_result() 

1117 ) # Pin forever 

1118 

1119 if zero_data.status == Status.OVERRUN or ( 

1120 zero_data.status == Status.VALID 

1121 and isinstance(zero_data, ConjectureResult) 

1122 and zero_data.length * 2 > BUFFER_SIZE 

1123 ): 

1124 fail_health_check( 

1125 self.settings, 

1126 "The smallest natural input for this test is very " 

1127 "large. This makes it difficult for Hypothesis to generate " 

1128 "good inputs, especially when trying to shrink failing inputs." 

1129 "\n\n" 

1130 "Consider reducing the amount of data generated by the strategy. " 

1131 "Also consider introducing small alternative values for some " 

1132 "strategies. For example, could you " 

1133 "mark some arguments as optional by replacing `some_complex_strategy`" 

1134 "with `st.none() | some_complex_strategy`?" 

1135 "\n\n" 

1136 "If you are confident that the size of the smallest natural input " 

1137 "to your test cannot be reduced, you can suppress this health check " 

1138 "with @settings(suppress_health_check=[HealthCheck.large_base_example]). " 

1139 "See " 

1140 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

1141 "for details.", 

1142 HealthCheck.large_base_example, 

1143 ) 

1144 

1145 self.health_check_state = HealthCheckState() 

1146 

1147 # We attempt to use the size of the minimal generated test case starting 

1148 # from a given novel prefix as a guideline to generate smaller test 

1149 # cases for an initial period, by restriscting ourselves to test cases 

1150 # that are not much larger than it. 

1151 # 

1152 # Calculating the actual minimal generated test case is hard, so we 

1153 # take a best guess that zero extending a prefix produces the minimal 

1154 # test case starting with that prefix (this is true for our built in 

1155 # strategies). This is only a reasonable thing to do if the resulting 

1156 # test case is valid. If we regularly run into situations where it is 

1157 # not valid then this strategy is a waste of time, so we want to 

1158 # abandon it early. In order to do this we track how many times in a 

1159 # row it has failed to work, and abort small test case generation when 

1160 # it has failed too many times in a row. 

1161 consecutive_zero_extend_is_invalid = 0 

1162 

1163 # We control growth during initial example generation, for two 

1164 # reasons: 

1165 # 

1166 # * It gives us an opportunity to find small examples early, which 

1167 # gives us a fast path for easy to find bugs. 

1168 # * It avoids low probability events where we might end up 

1169 # generating very large examples during health checks, which 

1170 # on slower machines can trigger HealthCheck.too_slow. 

1171 # 

1172 # The heuristic we use is that we attempt to estimate the smallest 

1173 # extension of this prefix, and limit the size to no more than 

1174 # an order of magnitude larger than that. If we fail to estimate 

1175 # the size accurately, we skip over this prefix and try again. 

1176 # 

1177 # We need to tune the example size based on the initial prefix, 

1178 # because any fixed size might be too small, and any size based 

1179 # on the strategy in general can fall afoul of strategies that 

1180 # have very different sizes for different prefixes. 

1181 # 

1182 # We previously set a minimum value of 10 on small_example_cap, with the 

1183 # reasoning of avoiding flaky health checks. However, some users set a 

1184 # low max_examples for performance. A hard lower bound in this case biases 

1185 # the distribution towards small (and less powerful) examples. Flaky 

1186 # and loud health checks are better than silent performance degradation. 

1187 small_example_cap = min(self.settings.max_examples // 10, 50) 

1188 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10) 

1189 ran_optimisations = False 

1190 self._switch_to_hypothesis_provider = False 

1191 

1192 while self.should_generate_more(): 

1193 # we don't yet integrate DataTree with backends. Instead of generating 

1194 # a novel prefix, ask the backend for an input. 

1195 if not self.using_hypothesis_backend: 

1196 data = self.new_conjecture_data([]) 

1197 with suppress(BackendCannotProceed): 

1198 self.test_function(data) 

1199 continue 

1200 

1201 self._current_phase = "generate" 

1202 prefix = self.generate_novel_prefix() 

1203 if ( 

1204 self.valid_examples <= small_example_cap 

1205 and self.call_count <= 5 * small_example_cap 

1206 and not self.interesting_examples 

1207 and consecutive_zero_extend_is_invalid < 5 

1208 ): 

1209 minimal_example = self.cached_test_function( 

1210 prefix + (ChoiceTemplate("simplest", count=None),) 

1211 ) 

1212 

1213 if minimal_example.status < Status.VALID: 

1214 consecutive_zero_extend_is_invalid += 1 

1215 continue 

1216 # Because the Status code is greater than Status.VALID, it cannot be 

1217 # Status.OVERRUN, which guarantees that the minimal_example is a 

1218 # ConjectureResult object. 

1219 assert isinstance(minimal_example, ConjectureResult) 

1220 consecutive_zero_extend_is_invalid = 0 

1221 minimal_extension = len(minimal_example.choices) - len(prefix) 

1222 max_length = len(prefix) + minimal_extension * 5 

1223 

1224 # We could end up in a situation where even though the prefix was 

1225 # novel when we generated it, because we've now tried zero extending 

1226 # it not all possible continuations of it will be novel. In order to 

1227 # avoid making redundant test calls, we rerun it in simulation mode 

1228 # first. If this has a predictable result, then we don't bother 

1229 # running the test function for real here. If however we encounter 

1230 # some novel behaviour, we try again with the real test function, 

1231 # starting from the new novel prefix that has discovered. 

1232 trial_data = self.new_conjecture_data(prefix, max_choices=max_length) 

1233 try: 

1234 self.tree.simulate_test_function(trial_data) 

1235 continue 

1236 except PreviouslyUnseenBehaviour: 

1237 pass 

1238 

1239 # If the simulation entered part of the tree that has been killed, 

1240 # we don't want to run this. 

1241 assert isinstance(trial_data.observer, TreeRecordingObserver) 

1242 if trial_data.observer.killed: 

1243 continue 

1244 

1245 # We might have hit the cap on number of examples we should 

1246 # run when calculating the minimal example. 

1247 if not self.should_generate_more(): 

1248 break 

1249 

1250 prefix = trial_data.choices 

1251 else: 

1252 max_length = None 

1253 

1254 data = self.new_conjecture_data(prefix, max_choices=max_length) 

1255 self.test_function(data) 

1256 

1257 if ( 

1258 data.status is Status.OVERRUN 

1259 and max_length is not None 

1260 and "invalid because" not in data.events 

1261 ): 

1262 data.events["invalid because"] = ( 

1263 "reduced max size for early examples (avoids flaky health checks)" 

1264 ) 

1265 

1266 self.generate_mutations_from(data) 

1267 

1268 # Although the optimisations are logically a distinct phase, we 

1269 # actually normally run them as part of example generation. The 

1270 # reason for this is that we cannot guarantee that optimisation 

1271 # actually exhausts our budget: It might finish running and we 

1272 # discover that actually we still could run a bunch more test cases 

1273 # if we want. 

1274 if ( 

1275 self.valid_examples >= max(small_example_cap, optimise_at) 

1276 and not ran_optimisations 

1277 ): 

1278 ran_optimisations = True 

1279 self._current_phase = "target" 

1280 self.optimise_targets() 

1281 

1282 def generate_mutations_from( 

1283 self, data: Union[ConjectureData, ConjectureResult] 

1284 ) -> None: 

1285 # A thing that is often useful but rarely happens by accident is 

1286 # to generate the same value at multiple different points in the 

1287 # test case. 

1288 # 

1289 # Rather than make this the responsibility of individual strategies 

1290 # we implement a small mutator that just takes parts of the test 

1291 # case with the same label and tries replacing one of them with a 

1292 # copy of the other and tries running it. If we've made a good 

1293 # guess about what to put where, this will run a similar generated 

1294 # test case with more duplication. 

1295 if ( 

1296 # An OVERRUN doesn't have enough information about the test 

1297 # case to mutate, so we just skip those. 

1298 data.status >= Status.INVALID 

1299 # This has a tendency to trigger some weird edge cases during 

1300 # generation so we don't let it run until we're done with the 

1301 # health checks. 

1302 and self.health_check_state is None 

1303 ): 

1304 initial_calls = self.call_count 

1305 failed_mutations = 0 

1306 

1307 while ( 

1308 self.should_generate_more() 

1309 # We implement fairly conservative checks for how long we 

1310 # we should run mutation for, as it's generally not obvious 

1311 # how helpful it is for any given test case. 

1312 and self.call_count <= initial_calls + 5 

1313 and failed_mutations <= 5 

1314 ): 

1315 groups = data.spans.mutator_groups 

1316 if not groups: 

1317 break 

1318 

1319 group = self.random.choice(groups) 

1320 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2) 

1321 if start1 > start2: 

1322 (start1, end1), (start2, end2) = (start2, end2), (start1, end1) 

1323 

1324 if ( 

1325 start1 <= start2 <= end2 <= end1 

1326 ): # pragma: no cover # flaky on conjecture-cover tests 

1327 # One span entirely contains the other. The strategy is very 

1328 # likely some kind of tree. e.g. we might have 

1329 # 

1330 # ┌─────┐ 

1331 # ┌─────┤ a ├──────┐ 

1332 # │ └─────┘ │ 

1333 # ┌──┴──┐ ┌──┴──┐ 

1334 # ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1335 # │ └──┬──┘ │ │ └──┬──┘ │ 

1336 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1337 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1338 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1339 # 

1340 # where each node is drawn from the same strategy and so 

1341 # has the same span label. We might have selected the spans 

1342 # corresponding to the a and c nodes, which is the entire 

1343 # tree and the subtree of (and including) c respectively. 

1344 # 

1345 # There are two possible mutations we could apply in this case: 

1346 # 1. replace a with c (replace child with parent) 

1347 # 2. replace c with a (replace parent with child) 

1348 # 

1349 # (1) results in multiple partial copies of the 

1350 # parent: 

1351 # ┌─────┐ 

1352 # ┌─────┤ a ├────────────┐ 

1353 # │ └─────┘ │ 

1354 # ┌──┴──┐ ┌─┴───┐ 

1355 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐ 

1356 # │ └──┬──┘ │ │ └─────┘ │ 

1357 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐ 

1358 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1359 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │ 

1360 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1361 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1362 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1363 # 

1364 # While (2) results in truncating part of the parent: 

1365 # 

1366 # ┌─────┐ 

1367 # ┌──┤ c ├──┐ 

1368 # │ └──┬──┘ │ 

1369 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1370 # │ g │ │ h │ │ i │ 

1371 # └───┘ └───┘ └───┘ 

1372 # 

1373 # (1) is the same as Example IV.4. in Nautilus (NDSS '19) 

1374 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf), 

1375 # except we do not repeat the replacement additional times 

1376 # (the paper repeats it once for a total of two copies). 

1377 # 

1378 # We currently only apply mutation (1), and ignore mutation 

1379 # (2). The reason is that the attempt generated from (2) is 

1380 # always something that Hypothesis could easily have generated 

1381 # itself, by simply not making various choices. Whereas 

1382 # duplicating the exact value + structure of particular choices 

1383 # in (1) would have been hard for Hypothesis to generate by 

1384 # chance. 

1385 # 

1386 # TODO: an extension of this mutation might repeat (1) on 

1387 # a geometric distribution between 0 and ~10 times. We would 

1388 # need to find the corresponding span to recurse on in the new 

1389 # choices, probably just by using the choices index. 

1390 

1391 # case (1): duplicate the choices in start1:start2. 

1392 attempt = data.choices[:start2] + data.choices[start1:] 

1393 else: 

1394 (start, end) = self.random.choice([(start1, end1), (start2, end2)]) 

1395 replacement = data.choices[start:end] 

1396 # We attempt to replace both the examples with 

1397 # whichever choice we made. Note that this might end 

1398 # up messing up and getting the example boundaries 

1399 # wrong - labels matching are only a best guess as to 

1400 # whether the two are equivalent - but it doesn't 

1401 # really matter. It may not achieve the desired result, 

1402 # but it's still a perfectly acceptable choice sequence 

1403 # to try. 

1404 attempt = ( 

1405 data.choices[:start1] 

1406 + replacement 

1407 + data.choices[end1:start2] 

1408 + replacement 

1409 + data.choices[end2:] 

1410 ) 

1411 

1412 try: 

1413 new_data = self.cached_test_function( 

1414 attempt, 

1415 # We set error_on_discard so that we don't end up 

1416 # entering parts of the tree we consider redundant 

1417 # and not worth exploring. 

1418 error_on_discard=True, 

1419 ) 

1420 except ContainsDiscard: 

1421 failed_mutations += 1 

1422 continue 

1423 

1424 if new_data is Overrun: 

1425 failed_mutations += 1 # pragma: no cover # annoying case 

1426 else: 

1427 assert isinstance(new_data, ConjectureResult) 

1428 if ( 

1429 new_data.status >= data.status 

1430 and choices_key(data.choices) != choices_key(new_data.choices) 

1431 and all( 

1432 k in new_data.target_observations 

1433 and new_data.target_observations[k] >= v 

1434 for k, v in data.target_observations.items() 

1435 ) 

1436 ): 

1437 data = new_data 

1438 failed_mutations = 0 

1439 else: 

1440 failed_mutations += 1 

1441 

1442 def optimise_targets(self) -> None: 

1443 """If any target observations have been made, attempt to optimise them 

1444 all.""" 

1445 if not self.should_optimise: 

1446 return 

1447 from hypothesis.internal.conjecture.optimiser import Optimiser 

1448 

1449 # We want to avoid running the optimiser for too long in case we hit 

1450 # an unbounded target score. We start this off fairly conservatively 

1451 # in case interesting examples are easy to find and then ramp it up 

1452 # on an exponential schedule so we don't hamper the optimiser too much 

1453 # if it needs a long time to find good enough improvements. 

1454 max_improvements = 10 

1455 while True: 

1456 prev_calls = self.call_count 

1457 

1458 any_improvements = False 

1459 

1460 for target, data in list(self.best_examples_of_observed_targets.items()): 

1461 optimiser = Optimiser( 

1462 self, data, target, max_improvements=max_improvements 

1463 ) 

1464 optimiser.run() 

1465 if optimiser.improvements > 0: 

1466 any_improvements = True 

1467 

1468 if self.interesting_examples: 

1469 break 

1470 

1471 max_improvements *= 2 

1472 

1473 if any_improvements: 

1474 continue 

1475 

1476 if self.best_observed_targets: 

1477 self.pareto_optimise() 

1478 

1479 if prev_calls == self.call_count: 

1480 break 

1481 

1482 def pareto_optimise(self) -> None: 

1483 if self.pareto_front is not None: 

1484 ParetoOptimiser(self).run() 

1485 

1486 def _run(self) -> None: 

1487 # have to use the primitive provider to interpret database bits... 

1488 self._switch_to_hypothesis_provider = True 

1489 with self._log_phase_statistics("reuse"): 

1490 self.reuse_existing_examples() 

1491 # Fast path for development: If the database gave us interesting 

1492 # examples from the previously stored primary key, don't try 

1493 # shrinking it again as it's unlikely to work. 

1494 if self.reused_previously_shrunk_test_case: 

1495 self.exit_with(ExitReason.finished) 

1496 # ...but we should use the supplied provider when generating... 

1497 self._switch_to_hypothesis_provider = False 

1498 with self._log_phase_statistics("generate"): 

1499 self.generate_new_examples() 

1500 # We normally run the targeting phase mixed in with the generate phase, 

1501 # but if we've been asked to run it but not generation then we have to 

1502 # run it explicitly on its own here. 

1503 if Phase.generate not in self.settings.phases: 

1504 self._current_phase = "target" 

1505 self.optimise_targets() 

1506 # ...and back to the primitive provider when shrinking. 

1507 self._switch_to_hypothesis_provider = True 

1508 with self._log_phase_statistics("shrink"): 

1509 self.shrink_interesting_examples() 

1510 self.exit_with(ExitReason.finished) 

1511 

1512 def new_conjecture_data( 

1513 self, 

1514 prefix: Sequence[Union[ChoiceT, ChoiceTemplate]], 

1515 *, 

1516 observer: Optional[DataObserver] = None, 

1517 max_choices: Optional[int] = None, 

1518 ) -> ConjectureData: 

1519 provider = ( 

1520 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider 

1521 ) 

1522 observer = observer or self.tree.new_observer() 

1523 if not self.using_hypothesis_backend: 

1524 observer = DataObserver() 

1525 

1526 return ConjectureData( 

1527 prefix=prefix, 

1528 observer=observer, 

1529 provider=provider, 

1530 max_choices=max_choices, 

1531 random=self.random, 

1532 ) 

1533 

1534 def shrink_interesting_examples(self) -> None: 

1535 """If we've found interesting examples, try to replace each of them 

1536 with a minimal interesting example with the same interesting_origin. 

1537 

1538 We may find one or more examples with a new interesting_origin 

1539 during the shrink process. If so we shrink these too. 

1540 """ 

1541 if Phase.shrink not in self.settings.phases or not self.interesting_examples: 

1542 return 

1543 

1544 self.debug("Shrinking interesting examples") 

1545 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS 

1546 

1547 for prev_data in sorted( 

1548 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes) 

1549 ): 

1550 assert prev_data.status == Status.INTERESTING 

1551 data = self.new_conjecture_data(prev_data.choices) 

1552 self.test_function(data) 

1553 if data.status != Status.INTERESTING: 

1554 self.exit_with(ExitReason.flaky) 

1555 

1556 self.clear_secondary_key() 

1557 

1558 while len(self.shrunk_examples) < len(self.interesting_examples): 

1559 target, example = min( 

1560 ( 

1561 (k, v) 

1562 for k, v in self.interesting_examples.items() 

1563 if k not in self.shrunk_examples 

1564 ), 

1565 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))), 

1566 ) 

1567 self.debug(f"Shrinking {target!r}: {example.choices}") 

1568 

1569 if not self.settings.report_multiple_bugs: 

1570 # If multi-bug reporting is disabled, we shrink our currently-minimal 

1571 # failure, allowing 'slips' to any bug with a smaller minimal example. 

1572 self.shrink(example, lambda d: d.status == Status.INTERESTING) 

1573 return 

1574 

1575 def predicate(d: Union[ConjectureResult, _Overrun]) -> bool: 

1576 if d.status < Status.INTERESTING: 

1577 return False 

1578 d = cast(ConjectureResult, d) 

1579 return d.interesting_origin == target 

1580 

1581 self.shrink(example, predicate) 

1582 

1583 self.shrunk_examples.add(target) 

1584 

1585 def clear_secondary_key(self) -> None: 

1586 if self.has_existing_examples(): 

1587 # If we have any smaller examples in the secondary corpus, now is 

1588 # a good time to try them to see if they work as shrinks. They 

1589 # probably won't, but it's worth a shot and gives us a good 

1590 # opportunity to clear out the database. 

1591 

1592 # It's not worth trying the primary corpus because we already 

1593 # tried all of those in the initial phase. 

1594 corpus = sorted( 

1595 self.settings.database.fetch(self.secondary_key), key=shortlex 

1596 ) 

1597 for c in corpus: 

1598 choices = choices_from_bytes(c) 

1599 if choices is None: 

1600 self.settings.database.delete(self.secondary_key, c) 

1601 continue 

1602 primary = { 

1603 choices_to_bytes(v.choices) 

1604 for v in self.interesting_examples.values() 

1605 } 

1606 if shortlex(c) > max(map(shortlex, primary)): 

1607 break 

1608 

1609 self.cached_test_function(choices) 

1610 # We unconditionally remove c from the secondary key as it 

1611 # is either now primary or worse than our primary example 

1612 # of this reason for interestingness. 

1613 self.settings.database.delete(self.secondary_key, c) 

1614 

1615 def shrink( 

1616 self, 

1617 example: Union[ConjectureData, ConjectureResult], 

1618 predicate: Optional[ShrinkPredicateT] = None, 

1619 allow_transition: Optional[ 

1620 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1621 ] = None, 

1622 ) -> Union[ConjectureData, ConjectureResult]: 

1623 s = self.new_shrinker(example, predicate, allow_transition) 

1624 s.shrink() 

1625 return s.shrink_target 

1626 

1627 def new_shrinker( 

1628 self, 

1629 example: Union[ConjectureData, ConjectureResult], 

1630 predicate: Optional[ShrinkPredicateT] = None, 

1631 allow_transition: Optional[ 

1632 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1633 ] = None, 

1634 ) -> Shrinker: 

1635 return Shrinker( 

1636 self, 

1637 example, 

1638 predicate, 

1639 allow_transition=allow_transition, 

1640 explain=Phase.explain in self.settings.phases, 

1641 in_target_phase=self._current_phase == "target", 

1642 ) 

1643 

1644 def passing_choice_sequences( 

1645 self, prefix: Sequence[ChoiceNode] = () 

1646 ) -> frozenset[tuple[ChoiceNode, ...]]: 

1647 """Return a collection of choice sequence nodes which cause the test to pass. 

1648 Optionally restrict this by a certain prefix, which is useful for explain mode. 

1649 """ 

1650 return frozenset( 

1651 cast(ConjectureResult, result).nodes 

1652 for key in self.__data_cache 

1653 if (result := self.__data_cache[key]).status is Status.VALID 

1654 and startswith(cast(ConjectureResult, result).nodes, prefix) 

1655 ) 

1656 

1657 

1658class ContainsDiscard(Exception): 

1659 pass