Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/engine.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

740 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import importlib 

12import math 

13import threading 

14import time 

15from collections import defaultdict 

16from collections.abc import Callable, Generator, Sequence 

17from contextlib import AbstractContextManager, contextmanager, nullcontext 

18from dataclasses import dataclass, field 

19from datetime import timedelta 

20from enum import Enum 

21from random import Random 

22from typing import Literal, NoReturn, cast 

23 

24from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings 

25from hypothesis._settings import local_settings 

26from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes 

27from hypothesis.errors import ( 

28 BackendCannotProceed, 

29 FlakyBackendFailure, 

30 FlakyStrategyDefinition, 

31 HypothesisException, 

32 InvalidArgument, 

33 StopTest, 

34) 

35from hypothesis.internal.cache import LRUReusedCache 

36from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override 

37from hypothesis.internal.conjecture.choice import ( 

38 ChoiceConstraintsT, 

39 ChoiceKeyT, 

40 ChoiceNode, 

41 ChoiceT, 

42 ChoiceTemplate, 

43 choices_key, 

44) 

45from hypothesis.internal.conjecture.data import ( 

46 ConjectureData, 

47 ConjectureResult, 

48 DataObserver, 

49 Overrun, 

50 Status, 

51 _Overrun, 

52) 

53from hypothesis.internal.conjecture.datatree import ( 

54 DataTree, 

55 PreviouslyUnseenBehaviour, 

56 TreeRecordingObserver, 

57) 

58from hypothesis.internal.conjecture.junkdrawer import ( 

59 ensure_free_stackframes, 

60 startswith, 

61) 

62from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser 

63from hypothesis.internal.conjecture.providers import ( 

64 AVAILABLE_PROVIDERS, 

65 HypothesisProvider, 

66 PrimitiveProvider, 

67) 

68from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key 

69from hypothesis.internal.escalation import InterestingOrigin 

70from hypothesis.internal.healthcheck import fail_health_check 

71from hypothesis.internal.observability import Observation, with_observability_callback 

72from hypothesis.reporting import base_report, report, verbose_report 

73 

74# In most cases, the following constants are all Final. However, we do allow users 

75# to monkeypatch all of these variables, which means we cannot annotate them as 

76# Final or mypyc will inline them and render monkeypatching useless. 

77 

78#: The maximum number of times the shrinker will reduce the complexity of a failing 

79#: input before giving up. This avoids falling down a trap of exponential (or worse) 

80#: complexity, where the shrinker appears to be making progress but will take a 

81#: substantially long time to finish completely. 

82MAX_SHRINKS: int = 500 

83 

84# If the shrinking phase takes more than five minutes, abort it early and print 

85# a warning. Many CI systems will kill a build after around ten minutes with 

86# no output, and appearing to hang isn't great for interactive use either - 

87# showing partially-shrunk examples is better than quitting with no examples! 

88# (but make it monkeypatchable, for the rare users who need to keep on shrinking) 

89 

90#: The maximum total time in seconds that the shrinker will try to shrink a failure 

91#: for before giving up. This is across all shrinks for the same failure, so even 

92#: if the shrinker successfully reduces the complexity of a single failure several 

93#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken. 

94MAX_SHRINKING_SECONDS: int = 300 

95 

96#: The maximum amount of entropy a single test case can use before giving up 

97#: while making random choices during input generation. 

98#: 

99#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you 

100#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly 

101#: increase the amount of entropy a test case can use during generation. 

102BUFFER_SIZE: int = 8 * 1024 

103CACHE_SIZE: int = 10000 

104MIN_TEST_CALLS: int = 10 

105 

106# we use this to isolate Hypothesis from interacting with the global random, 

107# to make it easier to reason about our global random warning logic (see 

108# deprecate_random_in_strategy). 

109_random = Random() 

110 

111 

112def shortlex(s): 

113 return (len(s), s) 

114 

115 

116@dataclass(slots=True, frozen=False) 

117class HealthCheckState: 

118 valid_examples: int = field(default=0) 

119 invalid_examples: int = field(default=0) 

120 overrun_examples: int = field(default=0) 

121 draw_times: defaultdict[str, list[float]] = field( 

122 default_factory=lambda: defaultdict(list) 

123 ) 

124 

125 @property 

126 def total_draw_time(self) -> float: 

127 return math.fsum(sum(self.draw_times.values(), start=[])) 

128 

129 def timing_report(self) -> str: 

130 """Return a terminal report describing what was slow.""" 

131 if not self.draw_times: 

132 return "" 

133 width = max( 

134 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times 

135 ) 

136 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"] 

137 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1])) 

138 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch 

139 # If we have very many unique keys, which can happen due to interactive 

140 # draws with computed labels, we'll skip uninformative rows. 

141 if ( 

142 5 <= i < (len(self.draw_times) - 2) 

143 and math.fsum(times) * 20 < self.total_draw_time 

144 ): 

145 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)") 

146 break 

147 # Compute the row to report, omitting times <1ms to focus on slow draws 

148 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4] 

149 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",") 

150 arg = argname.removeprefix("generate:").removesuffix(": ") 

151 out.append( 

152 f" {arg:^{width}} | {len(times):>4} | " 

153 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}" 

154 ) 

155 return "\n".join(out) 

156 

157 

158def _invalid_thresholds(*, r: float, c: float) -> tuple[int, int]: 

159 base = math.ceil(math.log(1 - c) / math.log(1 - r)) - 1 

160 per_p = math.ceil(1 / r) 

161 return base, per_p 

162 

163 

164# stop once we're 99% confident the true valid rate is below 1%. See 

165# https://github.com/HypothesisWorks/hypothesis/issues/4623#issuecomment-3814681997 

166# for how we derived this formula. 

167INVALID_THRESHOLD_BASE, INVALID_PER_VALID = _invalid_thresholds(r=0.01, c=0.99) 

168 

169 

170class ExitReason(Enum): 

171 max_examples = "settings.max_examples={s.max_examples}" 

172 max_iterations = ( 

173 "settings.max_examples={s.max_examples}, " 

174 "but < 1% of examples satisfied assumptions" 

175 ) 

176 max_shrinks = f"shrunk example {MAX_SHRINKS} times" 

177 finished = "nothing left to do" 

178 flaky = "test was flaky" 

179 very_slow_shrinking = "shrinking was very slow" 

180 

181 def describe(self, settings: Settings) -> str: 

182 return self.value.format(s=settings) 

183 

184 

185class RunIsComplete(Exception): 

186 pass 

187 

188 

189def _get_provider(backend: str) -> PrimitiveProvider | type[PrimitiveProvider]: 

190 provider_cls = AVAILABLE_PROVIDERS[backend] 

191 if isinstance(provider_cls, str): 

192 module_name, class_name = provider_cls.rsplit(".", 1) 

193 provider_cls = getattr(importlib.import_module(module_name), class_name) 

194 

195 if provider_cls.lifetime == "test_function": 

196 return provider_cls(None) 

197 elif provider_cls.lifetime == "test_case": 

198 return provider_cls 

199 else: 

200 raise InvalidArgument( 

201 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. " 

202 "Expected one of 'test_function', 'test_case'." 

203 ) 

204 

205 

206class CallStats(TypedDict): 

207 status: str 

208 runtime: float 

209 drawtime: float 

210 gctime: float 

211 events: list[str] 

212 

213 

214PhaseStatistics = TypedDict( 

215 "PhaseStatistics", 

216 { 

217 "duration-seconds": float, 

218 "test-cases": list[CallStats], 

219 "distinct-failures": int, 

220 "shrinks-successful": int, 

221 }, 

222) 

223StatisticsDict = TypedDict( 

224 "StatisticsDict", 

225 { 

226 "generate-phase": NotRequired[PhaseStatistics], 

227 "reuse-phase": NotRequired[PhaseStatistics], 

228 "shrink-phase": NotRequired[PhaseStatistics], 

229 "explain-phase": NotRequired[PhaseStatistics], 

230 "stopped-because": NotRequired[str], 

231 "targets": NotRequired[dict[str, float]], 

232 "nodeid": NotRequired[str], 

233 }, 

234) 

235 

236 

237def choice_count(choices: Sequence[ChoiceT | ChoiceTemplate]) -> int | None: 

238 count = 0 

239 for choice in choices: 

240 if isinstance(choice, ChoiceTemplate): 

241 if choice.count is None: 

242 return None 

243 count += choice.count 

244 else: 

245 count += 1 

246 return count 

247 

248 

249class DiscardObserver(DataObserver): 

250 @override 

251 def kill_branch(self) -> NoReturn: 

252 raise ContainsDiscard 

253 

254 

255def realize_choices(data: ConjectureData, *, for_failure: bool) -> None: 

256 for node in data.nodes: 

257 value = data.provider.realize(node.value, for_failure=for_failure) 

258 expected_type = { 

259 "string": str, 

260 "float": float, 

261 "integer": int, 

262 "boolean": bool, 

263 "bytes": bytes, 

264 }[node.type] 

265 if type(value) is not expected_type: 

266 raise HypothesisException( 

267 f"expected {expected_type} from " 

268 f"{data.provider.realize.__qualname__}, got {type(value)}" 

269 ) 

270 

271 constraints = cast( 

272 ChoiceConstraintsT, 

273 { 

274 k: data.provider.realize(v, for_failure=for_failure) 

275 for k, v in node.constraints.items() 

276 }, 

277 ) 

278 node.value = value 

279 node.constraints = constraints 

280 

281 

282class ConjectureRunner: 

283 def __init__( 

284 self, 

285 test_function: Callable[[ConjectureData], None], 

286 *, 

287 settings: Settings | None = None, 

288 random: Random | None = None, 

289 database_key: bytes | None = None, 

290 ignore_limits: bool = False, 

291 thread_overlap: dict[int, bool] | None = None, 

292 ) -> None: 

293 self._test_function: Callable[[ConjectureData], None] = test_function 

294 self.settings: Settings = settings or Settings() 

295 self.shrinks: int = 0 

296 self.finish_shrinking_deadline: float | None = None 

297 self.call_count: int = 0 

298 self.misaligned_count: int = 0 

299 self.valid_examples: int = 0 

300 self.invalid_examples: int = 0 

301 self.overrun_examples: int = 0 

302 self.random: Random = random or Random(_random.getrandbits(128)) 

303 self.database_key: bytes | None = database_key 

304 self.ignore_limits: bool = ignore_limits 

305 self.thread_overlap = {} if thread_overlap is None else thread_overlap 

306 

307 # Global dict of per-phase statistics, and a list of per-call stats 

308 # which transfer to the global dict at the end of each phase. 

309 self._current_phase: str = "(not a phase)" 

310 self.statistics: StatisticsDict = {} 

311 self.stats_per_test_case: list[CallStats] = [] 

312 # Time spent in any nested phase, so the enclosing phase can exclude it. 

313 self._nested_phase_seconds: float = 0.0 

314 

315 self.interesting_examples: dict[InterestingOrigin, ConjectureResult] = {} 

316 # We use call_count because there may be few possible valid_examples. 

317 self.first_bug_found_at: int | None = None 

318 self.last_bug_found_at: int | None = None 

319 self.first_bug_found_time: float = math.inf 

320 

321 self.shrunk_examples: set[InterestingOrigin] = set() 

322 self.health_check_state: HealthCheckState | None = None 

323 self.tree: DataTree = DataTree() 

324 self.provider: PrimitiveProvider | type[PrimitiveProvider] = _get_provider( 

325 self.settings.backend 

326 ) 

327 

328 self.best_observed_targets: defaultdict[str, float] = defaultdict( 

329 lambda: NO_SCORE 

330 ) 

331 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {} 

332 

333 # We keep the pareto front in the example database if we have one. This 

334 # is only marginally useful at present, but speeds up local development 

335 # because it means that large targets will be quickly surfaced in your 

336 # testing. 

337 self.pareto_front: ParetoFront | None = None 

338 if self.database_key is not None and self.settings.database is not None: 

339 self.pareto_front = ParetoFront(self.random) 

340 self.pareto_front.on_evict(self.on_pareto_evict) 

341 

342 # We want to be able to get the ConjectureData object that results 

343 # from running a choice sequence without recalculating, especially during 

344 # shrinking where we need to know about the structure of the 

345 # executed test case. 

346 self.__data_cache = LRUReusedCache[ 

347 tuple[ChoiceKeyT, ...], ConjectureResult | _Overrun 

348 ](CACHE_SIZE) 

349 

350 self.reused_previously_shrunk_test_case: bool = False 

351 

352 self.__pending_call_explanation: str | None = None 

353 self._backend_found_failure: bool = False 

354 self._backend_exceeded_deadline: bool = False 

355 self._backend_discard_count: int = 0 

356 # note unsound verification by alt backends 

357 self._verified_by_backend: str | None = None 

358 self._switch_to_hypothesis_provider: bool = False 

359 

360 @contextmanager 

361 def _with_switch_to_hypothesis_provider( 

362 self, value: bool 

363 ) -> Generator[None, None, None]: 

364 previous = self._switch_to_hypothesis_provider 

365 try: 

366 self._switch_to_hypothesis_provider = value 

367 yield 

368 finally: 

369 self._switch_to_hypothesis_provider = previous 

370 

371 @property 

372 def using_hypothesis_backend(self) -> bool: 

373 return ( 

374 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider 

375 ) 

376 

377 def explain_next_call_as(self, explanation: str) -> None: 

378 self.__pending_call_explanation = explanation 

379 

380 def clear_call_explanation(self) -> None: 

381 self.__pending_call_explanation = None 

382 

383 @contextmanager 

384 def _log_phase_statistics( 

385 self, phase: Literal["reuse", "generate", "shrink", "explain"] 

386 ) -> Generator[None, None, None]: 

387 # Phases may nest - the explain phase runs inside the shrink phase - so 

388 # we save and restore the per-call stats and current phase, exclude the 

389 # duration of any nested phase, and accumulate when a phase is entered 

390 # more than once (the explain phase runs once per shrinking target). 

391 saved_stats = self.stats_per_test_case 

392 saved_phase = self._current_phase 

393 saved_nested_seconds = self._nested_phase_seconds 

394 self.stats_per_test_case = [] 

395 self._current_phase = phase 

396 self._nested_phase_seconds = 0.0 

397 start_time = time.perf_counter() 

398 try: 

399 yield 

400 finally: 

401 elapsed = time.perf_counter() - start_time 

402 # A phase can be entered more than once (the explain phase runs once 

403 # per shrinking target), so accumulate into any existing bucket. 

404 stats = self.statistics.setdefault( 

405 phase + "-phase", # type: ignore 

406 {"duration-seconds": 0.0, "test-cases": []}, 

407 ) 

408 stats["duration-seconds"] += elapsed - self._nested_phase_seconds 

409 stats["test-cases"] += self.stats_per_test_case 

410 stats["distinct-failures"] = len(self.interesting_examples) 

411 stats["shrinks-successful"] = self.shrinks 

412 self.stats_per_test_case = saved_stats 

413 self._current_phase = saved_phase 

414 self._nested_phase_seconds = saved_nested_seconds + elapsed 

415 

416 @property 

417 def should_optimise(self) -> bool: 

418 return Phase.target in self.settings.phases 

419 

420 def __tree_is_exhausted(self) -> bool: 

421 return self.tree.is_exhausted and self.using_hypothesis_backend 

422 

423 def __stoppable_test_function(self, data: ConjectureData) -> None: 

424 """Run ``self._test_function``, but convert a ``StopTest`` exception 

425 into a normal return and avoid raising anything flaky for RecursionErrors. 

426 """ 

427 # We ensure that the test has this much stack space remaining, no 

428 # matter the size of the stack when called, to de-flake RecursionErrors 

429 # (#2494, #3671). Note, this covers the data generation part of the test; 

430 # the actual test execution is additionally protected at the call site 

431 # in hypothesis.core.execute_once. 

432 with ensure_free_stackframes(): 

433 try: 

434 self._test_function(data) 

435 except StopTest as e: 

436 if e.testcounter == data.testcounter: 

437 # This StopTest has successfully stopped its test, and can now 

438 # be discarded. 

439 pass 

440 else: 

441 # This StopTest was raised by a different ConjectureData. We 

442 # need to re-raise it so that it will eventually reach the 

443 # correct engine. 

444 raise 

445 

446 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]: 

447 return choices_key(choices) 

448 

449 def _cache(self, data: ConjectureData) -> None: 

450 result = data.as_result() 

451 key = self._cache_key(data.choices) 

452 self.__data_cache[key] = result 

453 

454 def cached_test_function( 

455 self, 

456 choices: Sequence[ChoiceT | ChoiceTemplate], 

457 *, 

458 error_on_discard: bool = False, 

459 extend: int | Literal["full"] = 0, 

460 ) -> ConjectureResult | _Overrun: 

461 """ 

462 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard`` 

463 in preference to running the actual test function. This is to allow us 

464 to skip test cases we expect to be redundant in some cases. Note that 

465 it may be the case that we don't raise ``ContainsDiscard`` even if the 

466 result has discards if we cannot determine from previous runs whether 

467 it will have a discard. 

468 """ 

469 # node templates represent a not-yet-filled hole and therefore cannot 

470 # be cached or retrieved from the cache. 

471 if not any(isinstance(choice, ChoiceTemplate) for choice in choices): 

472 # this type cast is validated by the isinstance check above (ie, there 

473 # are no ChoiceTemplate elements). 

474 choices = cast(Sequence[ChoiceT], choices) 

475 key = self._cache_key(choices) 

476 try: 

477 cached = self.__data_cache[key] 

478 # if we have a cached overrun for this key, but we're allowing extensions 

479 # of the nodes, it could in fact run to a valid data if we try. 

480 if extend == 0 or cached.status is not Status.OVERRUN: 

481 return cached 

482 except KeyError: 

483 pass 

484 

485 if extend == "full": 

486 max_length = None 

487 elif (count := choice_count(choices)) is None: 

488 max_length = None 

489 else: 

490 max_length = count + extend 

491 

492 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver. 

493 # The reason is we don't expect simulate_test_function to explore new choices 

494 # and write back to the tree, so we don't want the overhead of the 

495 # TreeRecordingObserver tracking those calls. 

496 trial_observer: DataObserver | None = DataObserver() 

497 if error_on_discard: 

498 trial_observer = DiscardObserver() 

499 

500 try: 

501 trial_data = self.new_conjecture_data( 

502 choices, observer=trial_observer, max_choices=max_length 

503 ) 

504 self.tree.simulate_test_function(trial_data) 

505 except PreviouslyUnseenBehaviour: 

506 pass 

507 else: 

508 trial_data.freeze() 

509 key = self._cache_key(trial_data.choices) 

510 if trial_data.status > Status.OVERRUN: 

511 try: 

512 return self.__data_cache[key] 

513 except KeyError: 

514 pass 

515 else: 

516 # if we simulated to an overrun, then we our result is certainly 

517 # an overrun; no need to consult the cache. (and we store this result 

518 # for simulation-less lookup later). 

519 self.__data_cache[key] = Overrun 

520 return Overrun 

521 try: 

522 return self.__data_cache[key] 

523 except KeyError: 

524 pass 

525 

526 data = self.new_conjecture_data(choices, max_choices=max_length) 

527 # note that calling test_function caches `data` for us. 

528 self.test_function(data) 

529 return data.as_result() 

530 

531 def test_function(self, data: ConjectureData) -> None: 

532 if self.__pending_call_explanation is not None: 

533 self.debug(self.__pending_call_explanation) 

534 self.__pending_call_explanation = None 

535 

536 self.call_count += 1 

537 interrupted = False 

538 

539 def _backend_cannot_proceed( 

540 exc: BackendCannotProceed, data: ConjectureData 

541 ) -> None: 

542 if exc.scope in ("verified", "exhausted"): 

543 self._switch_to_hypothesis_provider = True 

544 if exc.scope == "verified": 

545 self._verified_by_backend = self.settings.backend 

546 elif exc.scope == "discard_test_case": 

547 self._backend_discard_count += 1 

548 if ( 

549 self._backend_discard_count > 10 

550 and (self._backend_discard_count / self.call_count) > 0.2 

551 ): 

552 verbose_report( 

553 f"Switching away from backend {self.settings.backend!r} " 

554 "to the Hypothesis backend, " 

555 f"because {self._backend_discard_count} of {self.call_count} " 

556 "attempted test cases " 

557 f"({self._backend_discard_count / self.call_count * 100:0.1f}%) " 

558 f"were discarded by backend {self.settings.backend!r}" 

559 ) 

560 self._switch_to_hypothesis_provider = True 

561 

562 # treat all BackendCannotProceed exceptions as invalid. This isn't 

563 # great; "verified" should really be counted as self.valid_examples += 1. 

564 # But we check self.valid_examples == 0 to determine whether to raise 

565 # Unsatisfiable, and that would throw this check off. 

566 self.invalid_examples += 1 

567 data.cannot_proceed_scope = exc.scope 

568 

569 # this fiddly bit of control flow is to work around `return` being 

570 # disallowed in `finally` blocks as of python 3.14. Otherwise, we would 

571 # just return in the _backend_cannot_proceed branch. 

572 finally_early_return = False 

573 

574 try: 

575 self.__stoppable_test_function(data) 

576 except KeyboardInterrupt: 

577 interrupted = True 

578 raise 

579 except BackendCannotProceed as exc: 

580 _backend_cannot_proceed(exc, data) 

581 # skip the post-test-case tracking; we're pretending this never happened 

582 interrupted = True 

583 data.freeze() 

584 return 

585 except BaseException as err: 

586 data.freeze() 

587 if isinstance(err, FlakyStrategyDefinition) and data._stateful_repr_parts: 

588 # In a stateful test, surface the steps leading up to the 

589 # inconsistency. 

590 report( 

591 "Steps leading up to this error:\n" 

592 + "\n".join(f" {s}" for s in data._stateful_repr_parts) 

593 ) 

594 if self.settings.backend != "hypothesis": 

595 try: 

596 realize_choices(data, for_failure=True) 

597 except BackendCannotProceed as exc: 

598 _backend_cannot_proceed(exc, data) 

599 # skip the post-test-case tracking; we're pretending this 

600 # never happened 

601 interrupted = True 

602 return 

603 self.save_choices(data.choices) 

604 raise 

605 finally: 

606 # No branch, because if we're interrupted we always raise 

607 # the KeyboardInterrupt, never continue to the code below. 

608 if not interrupted: # pragma: no branch 

609 assert data.cannot_proceed_scope is None 

610 data.freeze() 

611 

612 if self.settings.backend != "hypothesis": 

613 try: 

614 realize_choices( 

615 data, for_failure=data.status is Status.INTERESTING 

616 ) 

617 except BackendCannotProceed as exc: 

618 _backend_cannot_proceed(exc, data) 

619 finally_early_return = True 

620 

621 if not finally_early_return: 

622 call_stats: CallStats = { 

623 "status": data.status.name.lower(), 

624 "runtime": data.finish_time - data.start_time, 

625 "drawtime": math.fsum(data.draw_times.values()), 

626 "gctime": data.gc_finish_time - data.gc_start_time, 

627 "events": sorted( 

628 k if v == "" else f"{k}: {v}" 

629 for k, v in data.events.items() 

630 ), 

631 } 

632 self.stats_per_test_case.append(call_stats) 

633 

634 self._cache(data) 

635 if ( 

636 data.misaligned_at is not None 

637 ): # pragma: no branch # coverage bug? 

638 self.misaligned_count += 1 

639 

640 if finally_early_return: 

641 return 

642 

643 self.debug_data(data) 

644 

645 if ( 

646 data.target_observations 

647 and self.pareto_front is not None 

648 and self.pareto_front.add(data.as_result()) 

649 ): 

650 self.save_choices(data.choices, sub_key=b"pareto") 

651 

652 if data.status >= Status.VALID: 

653 for k, v in data.target_observations.items(): 

654 self.best_observed_targets[k] = max(self.best_observed_targets[k], v) 

655 

656 if k not in self.best_examples_of_observed_targets: 

657 data_as_result = data.as_result() 

658 assert not isinstance(data_as_result, _Overrun) 

659 self.best_examples_of_observed_targets[k] = data_as_result 

660 continue 

661 

662 existing_example = self.best_examples_of_observed_targets[k] 

663 existing_score = existing_example.target_observations[k] 

664 

665 if v < existing_score: 

666 continue 

667 

668 if v > existing_score or sort_key(data.nodes) < sort_key( 

669 existing_example.nodes 

670 ): 

671 data_as_result = data.as_result() 

672 assert not isinstance(data_as_result, _Overrun) 

673 self.best_examples_of_observed_targets[k] = data_as_result 

674 

675 if data.status is Status.VALID: 

676 self.valid_examples += 1 

677 if data.status is Status.INVALID: 

678 self.invalid_examples += 1 

679 if data.status is Status.OVERRUN: 

680 self.overrun_examples += 1 

681 

682 if data.status == Status.INTERESTING: 

683 if not self.using_hypothesis_backend: 

684 # replay this failure on the hypothesis backend to ensure it still 

685 # finds a failure. otherwise, it is flaky. 

686 initial_exception = data.expected_exception 

687 data = ConjectureData.for_choices(data.choices) 

688 # we've already going to use the hypothesis provider for this 

689 # data, so the verb "switch" is a bit misleading here. We're really 

690 # setting this to inform our on_observation logic that the observation 

691 # generated here was from a hypothesis backend, and shouldn't be 

692 # sent to the on_observation of any alternative backend. 

693 with self._with_switch_to_hypothesis_provider(True): 

694 self.__stoppable_test_function(data) 

695 data.freeze() 

696 # TODO: Should same-origin also be checked? (discussion in 

697 # https://github.com/HypothesisWorks/hypothesis/pull/4470#discussion_r2217055487) 

698 if data.status != Status.INTERESTING: 

699 desc_new_status = { 

700 data.status.VALID: "passed", 

701 data.status.INVALID: "failed filters", 

702 data.status.OVERRUN: "overran", 

703 }[data.status] 

704 raise FlakyBackendFailure( 

705 f"Inconsistent results from replaying a failing test case! " 

706 f"Raised {type(initial_exception).__name__} on " 

707 f"backend={self.settings.backend!r}, but " 

708 f"{desc_new_status} under backend='hypothesis'.", 

709 [initial_exception], 

710 ) 

711 

712 self._cache(data) 

713 

714 assert data.interesting_origin is not None 

715 key = data.interesting_origin 

716 changed = False 

717 try: 

718 existing = self.interesting_examples[key] 

719 except KeyError: 

720 changed = True 

721 self.last_bug_found_at = self.call_count 

722 if self.first_bug_found_at is None: 

723 self.first_bug_found_at = self.call_count 

724 self.first_bug_found_time = time.monotonic() 

725 else: 

726 if sort_key(data.nodes) < sort_key(existing.nodes): 

727 self.shrinks += 1 

728 self.downgrade_choices(existing.choices) 

729 self.__data_cache.unpin(self._cache_key(existing.choices)) 

730 changed = True 

731 

732 if changed: 

733 self.save_choices(data.choices) 

734 self.interesting_examples[key] = data.as_result() # type: ignore 

735 if not self.using_hypothesis_backend: 

736 self._backend_found_failure = True 

737 self.__data_cache.pin(self._cache_key(data.choices), data.as_result()) 

738 self.shrunk_examples.discard(key) 

739 

740 if self.shrinks >= MAX_SHRINKS: 

741 self.exit_with(ExitReason.max_shrinks) 

742 

743 if ( 

744 not self.ignore_limits 

745 and self.finish_shrinking_deadline is not None 

746 and self.finish_shrinking_deadline < time.perf_counter() 

747 ): 

748 # See https://github.com/HypothesisWorks/hypothesis/issues/2340 

749 report( 

750 "WARNING: Hypothesis has spent more than five minutes working to shrink" 

751 " a failing example, and stopped because it is making very slow" 

752 " progress. When you re-run your tests, shrinking will resume and may" 

753 " take this long before aborting again.\n\nPLEASE REPORT THIS if you can" 

754 " provide a reproducing example, so that we can improve shrinking" 

755 " performance for everyone." 

756 ) 

757 self.exit_with(ExitReason.very_slow_shrinking) 

758 

759 if not self.interesting_examples: 

760 # Note that this logic is reproduced to end the generation phase when 

761 # we have interesting examples. Update that too if you change this! 

762 # (The doubled implementation is because here we exit the engine entirely, 

763 # while in the other case below we just want to move on to shrinking.) 

764 if self.valid_examples >= self.settings.max_examples: 

765 self.exit_with(ExitReason.max_examples) 

766 if (self.invalid_examples + self.overrun_examples) > ( 

767 INVALID_THRESHOLD_BASE + INVALID_PER_VALID * self.valid_examples 

768 ): 

769 self.exit_with(ExitReason.max_iterations) 

770 

771 if self.__tree_is_exhausted(): 

772 self.exit_with(ExitReason.finished) 

773 

774 self.record_for_health_check(data) 

775 

776 def on_pareto_evict(self, data: ConjectureResult) -> None: 

777 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices)) 

778 

779 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]: 

780 """Uses the tree to proactively generate a starting choice sequence 

781 that we haven't explored yet for this test. 

782 

783 When this method is called, we assume that there must be at 

784 least one novel prefix left to find. If there were not, then the 

785 test run should have already stopped due to tree exhaustion. 

786 """ 

787 return self.tree.generate_novel_prefix(self.random) 

788 

789 def record_for_health_check(self, data: ConjectureData) -> None: 

790 # Once we've actually found a bug, there's no point in trying to run 

791 # health checks - they'll just mask the actually important information. 

792 if data.status == Status.INTERESTING: 

793 self.health_check_state = None 

794 

795 state = self.health_check_state 

796 

797 if state is None: 

798 return 

799 

800 for k, v in data.draw_times.items(): 

801 state.draw_times[k].append(v) 

802 

803 if data.status == Status.VALID: 

804 state.valid_examples += 1 

805 elif data.status == Status.INVALID: 

806 state.invalid_examples += 1 

807 else: 

808 assert data.status == Status.OVERRUN 

809 state.overrun_examples += 1 

810 

811 max_valid_draws = 10 

812 max_invalid_draws = 50 

813 max_overrun_draws = 20 

814 

815 assert state.valid_examples <= max_valid_draws 

816 

817 if state.valid_examples == max_valid_draws: 

818 self.health_check_state = None 

819 return 

820 

821 if state.overrun_examples == max_overrun_draws: 

822 fail_health_check( 

823 self.settings, 

824 "Generated inputs routinely consumed more than the maximum " 

825 f"allowed entropy: {state.valid_examples} inputs were generated " 

826 f"successfully, while {state.overrun_examples} inputs exceeded the " 

827 f"maximum allowed entropy during generation." 

828 "\n\n" 

829 f"Testing with inputs this large tends to be slow, and to produce " 

830 "failures that are both difficult to shrink and difficult to understand. " 

831 "Try decreasing the amount of data generated, for example by " 

832 "decreasing the minimum size of collection strategies like " 

833 "st.lists()." 

834 "\n\n" 

835 "If you expect the average size of your input to be this large, " 

836 "you can disable this health check with " 

837 "@settings(suppress_health_check=[HealthCheck.data_too_large]). " 

838 "See " 

839 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

840 "for details.", 

841 HealthCheck.data_too_large, 

842 ) 

843 if state.invalid_examples == max_invalid_draws: 

844 fail_health_check( 

845 self.settings, 

846 "It looks like this test is filtering out a lot of inputs. " 

847 f"{state.valid_examples} inputs were generated successfully, " 

848 f"while {state.invalid_examples} inputs were filtered out. " 

849 "\n\n" 

850 "An input might be filtered out by calls to assume(), " 

851 "strategy.filter(...), or occasionally by Hypothesis internals." 

852 "\n\n" 

853 "Applying this much filtering makes input generation slow, since " 

854 "Hypothesis must discard inputs which are filtered out and try " 

855 "generating it again. It is also possible that applying this much " 

856 "filtering will distort the domain and/or distribution of the test, " 

857 "leaving your testing less rigorous than expected." 

858 "\n\n" 

859 "If you expect this many inputs to be filtered out during generation, " 

860 "you can disable this health check with " 

861 "@settings(suppress_health_check=[HealthCheck.filter_too_much]). See " 

862 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

863 "for details.", 

864 HealthCheck.filter_too_much, 

865 ) 

866 

867 # Allow at least the greater of one second or 5x the deadline. If deadline 

868 # is None, allow 30s - the user can disable the healthcheck too if desired. 

869 draw_time = state.total_draw_time 

870 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6)) 

871 if ( 

872 draw_time > max(1.0, draw_time_limit.total_seconds()) 

873 # we disable HealthCheck.too_slow under concurrent threads, since 

874 # cpython may switch away from a thread for arbitrarily long. 

875 and not self.thread_overlap.get(threading.get_ident(), False) 

876 ): 

877 extra_str = [] 

878 if state.invalid_examples: 

879 extra_str.append(f"{state.invalid_examples} invalid inputs") 

880 if state.overrun_examples: 

881 extra_str.append( 

882 f"{state.overrun_examples} inputs which exceeded the " 

883 "maximum allowed entropy" 

884 ) 

885 extra_str = ", and ".join(extra_str) 

886 extra_str = f" ({extra_str})" if extra_str else "" 

887 

888 fail_health_check( 

889 self.settings, 

890 "Input generation is slow: Hypothesis only generated " 

891 f"{state.valid_examples} valid inputs after {draw_time:.2f} " 

892 f"seconds{extra_str}." 

893 "\n" + state.timing_report() + "\n\n" 

894 "This could be for a few reasons:" 

895 "\n" 

896 "1. This strategy could be generating too much data per input. " 

897 "Try decreasing the amount of data generated, for example by " 

898 "decreasing the minimum size of collection strategies like " 

899 "st.lists()." 

900 "\n" 

901 "2. Some other expensive computation could be running during input " 

902 "generation. For example, " 

903 "if @st.composite or st.data() is interspersed with an expensive " 

904 "computation, HealthCheck.too_slow is likely to trigger. If this " 

905 "computation is unrelated to input generation, move it elsewhere. " 

906 "Otherwise, try making it more efficient, or disable this health " 

907 "check if that is not possible." 

908 "\n\n" 

909 "If you expect input generation to take this long, you can disable " 

910 "this health check with " 

911 "@settings(suppress_health_check=[HealthCheck.too_slow]). See " 

912 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

913 "for details.", 

914 HealthCheck.too_slow, 

915 ) 

916 

917 def save_choices( 

918 self, choices: Sequence[ChoiceT], sub_key: bytes | None = None 

919 ) -> None: 

920 if self.settings.database is not None: 

921 key = self.sub_key(sub_key) 

922 if key is None: 

923 return 

924 self.settings.database.save(key, choices_to_bytes(choices)) 

925 

926 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None: 

927 buffer = choices_to_bytes(choices) 

928 if self.settings.database is not None and self.database_key is not None: 

929 self.settings.database.move(self.database_key, self.secondary_key, buffer) 

930 

931 def sub_key(self, sub_key: bytes | None) -> bytes | None: 

932 if self.database_key is None: 

933 return None 

934 if sub_key is None: 

935 return self.database_key 

936 return b".".join((self.database_key, sub_key)) 

937 

938 @property 

939 def secondary_key(self) -> bytes | None: 

940 return self.sub_key(b"secondary") 

941 

942 @property 

943 def pareto_key(self) -> bytes | None: 

944 return self.sub_key(b"pareto") 

945 

946 def debug(self, message: str) -> None: 

947 if self.settings.verbosity >= Verbosity.debug: 

948 base_report(message) 

949 

950 @property 

951 def report_debug_info(self) -> bool: 

952 return self.settings.verbosity >= Verbosity.debug 

953 

954 def debug_data(self, data: ConjectureData | ConjectureResult) -> None: 

955 if not self.report_debug_info: 

956 return 

957 

958 status = repr(data.status) 

959 if data.status == Status.INTERESTING: 

960 status = f"{status} ({data.interesting_origin!r})" 

961 elif data.status == Status.INVALID and isinstance(data, ConjectureData): 

962 assert isinstance(data, ConjectureData) # mypy is silly 

963 status = f"{status} ({data.events.get('invalid because', '?')})" 

964 

965 newline_tab = "\n\t" 

966 self.debug( 

967 f"{len(data.choices)} choices -> {status}\n\t{data.choices}" 

968 f"{newline_tab + data.output if data.output else ''}" 

969 ) 

970 

971 def observe_for_provider(self) -> AbstractContextManager: 

972 def on_observation(observation: Observation) -> None: 

973 assert observation.type == "test_case" 

974 # because lifetime == "test_function" 

975 assert isinstance(self.provider, PrimitiveProvider) 

976 # only fire if we actually used that provider to generate this observation 

977 if not self._switch_to_hypothesis_provider: 

978 self.provider.on_observation(observation) 

979 

980 if ( 

981 self.settings.backend != "hypothesis" 

982 # only for lifetime = "test_function" providers (guaranteed 

983 # by this isinstance check) 

984 and isinstance(self.provider, PrimitiveProvider) 

985 # and the provider opted-in to observations 

986 and self.provider.add_observability_callback 

987 ): 

988 return with_observability_callback(on_observation) 

989 return nullcontext() 

990 

991 def run(self) -> None: 

992 with local_settings(self.settings), self.observe_for_provider(): 

993 try: 

994 self._run() 

995 except RunIsComplete: 

996 pass 

997 for v in self.interesting_examples.values(): 

998 self.debug_data(v) 

999 self.debug( 

1000 f"Run complete after {self.call_count} examples " 

1001 f"({self.valid_examples} valid) and {self.shrinks} shrinks" 

1002 ) 

1003 

1004 @property 

1005 def database(self) -> ExampleDatabase | None: 

1006 if self.database_key is None: 

1007 return None 

1008 return self.settings.database 

1009 

1010 def has_existing_examples(self) -> bool: 

1011 return self.database is not None and Phase.reuse in self.settings.phases 

1012 

1013 def reuse_existing_examples(self) -> None: 

1014 """If appropriate (we have a database and have been told to use it), 

1015 try to reload existing examples from the database. 

1016 

1017 If there are a lot we don't try all of them. We always try the 

1018 smallest example in the database (which is guaranteed to be the 

1019 last failure) and the largest (which is usually the seed example 

1020 which the last failure came from but we don't enforce that). We 

1021 then take a random sampling of the remainder and try those. Any 

1022 examples that are no longer interesting are cleared out. 

1023 """ 

1024 if self.has_existing_examples(): 

1025 self.debug("Reusing examples from database") 

1026 # We have to do some careful juggling here. We have two database 

1027 # corpora: The primary and secondary. The primary corpus is a 

1028 # small set of minimized examples each of which has at one point 

1029 # demonstrated a distinct bug. We want to retry all of these. 

1030 

1031 # We also have a secondary corpus of examples that have at some 

1032 # point demonstrated interestingness (currently only ones that 

1033 # were previously non-minimal examples of a bug, but this will 

1034 # likely expand in future). These are a good source of potentially 

1035 # interesting examples, but there are a lot of them, so we down 

1036 # sample the secondary corpus to a more manageable size. 

1037 

1038 corpus = sorted( 

1039 self.settings.database.fetch(self.database_key), key=shortlex 

1040 ) 

1041 factor = 0.1 if (Phase.generate in self.settings.phases) else 1 

1042 desired_size = max(2, ceil(factor * self.settings.max_examples)) 

1043 primary_corpus_size = len(corpus) 

1044 

1045 if len(corpus) < desired_size: 

1046 extra_corpus = list(self.settings.database.fetch(self.secondary_key)) 

1047 

1048 shortfall = desired_size - len(corpus) 

1049 

1050 if len(extra_corpus) <= shortfall: 

1051 extra = extra_corpus 

1052 else: 

1053 extra = self.random.sample(extra_corpus, shortfall) 

1054 extra.sort(key=shortlex) 

1055 corpus.extend(extra) 

1056 

1057 # We want a fast path where every primary entry in the database was 

1058 # interesting. 

1059 found_interesting_in_primary = False 

1060 all_interesting_in_primary_were_exact = True 

1061 

1062 for i, existing in enumerate(corpus): 

1063 if i >= primary_corpus_size and found_interesting_in_primary: 

1064 break 

1065 choices = choices_from_bytes(existing) 

1066 if choices is None: 

1067 # clear out any keys which fail deserialization 

1068 self.settings.database.delete(self.database_key, existing) 

1069 continue 

1070 data = self.cached_test_function(choices, extend="full") 

1071 if data.status != Status.INTERESTING: 

1072 self.settings.database.delete(self.database_key, existing) 

1073 self.settings.database.delete(self.secondary_key, existing) 

1074 else: 

1075 if i < primary_corpus_size: 

1076 found_interesting_in_primary = True 

1077 assert not isinstance(data, _Overrun) 

1078 if choices_key(choices) != choices_key(data.choices): 

1079 all_interesting_in_primary_were_exact = False 

1080 if not self.settings.report_multiple_bugs: 

1081 break 

1082 if found_interesting_in_primary: 

1083 if all_interesting_in_primary_were_exact: 

1084 self.reused_previously_shrunk_test_case = True 

1085 

1086 # Because self.database is not None (because self.has_existing_examples()) 

1087 # and self.database_key is not None (because we fetched using it above), 

1088 # we can guarantee self.pareto_front is not None 

1089 assert self.pareto_front is not None 

1090 

1091 # If we've not found any interesting examples so far we try some of 

1092 # the pareto front from the last run. 

1093 if len(corpus) < desired_size and not self.interesting_examples: 

1094 desired_extra = desired_size - len(corpus) 

1095 pareto_corpus = list(self.settings.database.fetch(self.pareto_key)) 

1096 if len(pareto_corpus) > desired_extra: 

1097 pareto_corpus = self.random.sample(pareto_corpus, desired_extra) 

1098 pareto_corpus.sort(key=shortlex) 

1099 

1100 for existing in pareto_corpus: 

1101 choices = choices_from_bytes(existing) 

1102 if choices is None: 

1103 self.settings.database.delete(self.pareto_key, existing) 

1104 continue 

1105 data = self.cached_test_function(choices, extend="full") 

1106 if data not in self.pareto_front: 

1107 self.settings.database.delete(self.pareto_key, existing) 

1108 if data.status == Status.INTERESTING: 

1109 break 

1110 

1111 def exit_with(self, reason: ExitReason) -> None: 

1112 if self.ignore_limits: 

1113 return 

1114 self.statistics["stopped-because"] = reason.describe(self.settings) 

1115 if self.best_observed_targets: 

1116 self.statistics["targets"] = dict(self.best_observed_targets) 

1117 self.debug(f"exit_with({reason.name})") 

1118 self.exit_reason = reason 

1119 raise RunIsComplete 

1120 

1121 def should_generate_more(self) -> bool: 

1122 # End the generation phase where we would have ended it if no bugs had 

1123 # been found. This reproduces the exit logic in `self.test_function`, 

1124 # but with the important distinction that this clause will move on to 

1125 # the shrinking phase having found one or more bugs, while the other 

1126 # will exit having found zero bugs. 

1127 invalid_threshold = ( 

1128 INVALID_THRESHOLD_BASE + INVALID_PER_VALID * self.valid_examples 

1129 ) 

1130 if ( 

1131 self.valid_examples >= self.settings.max_examples 

1132 or (self.invalid_examples + self.overrun_examples) > invalid_threshold 

1133 ): # pragma: no cover 

1134 return False 

1135 

1136 # If we haven't found a bug, keep looking - if we hit any limits on 

1137 # the number of tests to run that will raise an exception and stop 

1138 # the run. 

1139 if not self.interesting_examples: 

1140 return True 

1141 # Users who disable shrinking probably want to exit as fast as possible. 

1142 # If we've found a bug and won't report more than one, stop looking. 

1143 # If we first saw a bug more than 10 seconds ago, stop looking. 

1144 elif ( 

1145 Phase.shrink not in self.settings.phases 

1146 or not self.settings.report_multiple_bugs 

1147 or time.monotonic() - self.first_bug_found_time > 10 

1148 ): 

1149 return False 

1150 assert isinstance(self.first_bug_found_at, int) 

1151 assert isinstance(self.last_bug_found_at, int) 

1152 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count 

1153 # Otherwise, keep searching for between ten and 'a heuristic' calls. 

1154 # We cap 'calls after first bug' so errors are reported reasonably 

1155 # soon even for tests that are allowed to run for a very long time, 

1156 # or sooner if the latest half of our test effort has been fruitless. 

1157 return self.call_count < MIN_TEST_CALLS or self.call_count < min( 

1158 self.first_bug_found_at + 1000, self.last_bug_found_at * 2 

1159 ) 

1160 

1161 def generate_new_examples(self) -> None: 

1162 if Phase.generate not in self.settings.phases: 

1163 return 

1164 if self.interesting_examples: 

1165 # The example database has failing examples from a previous run, 

1166 # so we'd rather report that they're still failing ASAP than take 

1167 # the time to look for additional failures. 

1168 return 

1169 

1170 self.debug("Generating new examples") 

1171 

1172 assert self.should_generate_more() 

1173 self._switch_to_hypothesis_provider = True 

1174 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),)) 

1175 if zero_data.status > Status.OVERRUN: 

1176 assert isinstance(zero_data, ConjectureResult) 

1177 # if the crosshair backend cannot proceed, it does not (and cannot) 

1178 # realize the symbolic values, with the intent that Hypothesis will 

1179 # throw away this test case. We usually do, but if it's the zero data 

1180 # then we try to pin it here, which requires realizing the symbolics. 

1181 # 

1182 # We don't (yet) rely on the zero data being pinned, and so 

1183 # it's simply a very slight performance loss to simply not pin it 

1184 # if doing so would error. 

1185 if zero_data.cannot_proceed_scope is None: # pragma: no branch 

1186 self.__data_cache.pin( 

1187 self._cache_key(zero_data.choices), zero_data.as_result() 

1188 ) # Pin forever 

1189 

1190 if zero_data.status == Status.OVERRUN or ( 

1191 zero_data.status == Status.VALID 

1192 and isinstance(zero_data, ConjectureResult) 

1193 and zero_data.length * 2 > BUFFER_SIZE 

1194 ): 

1195 fail_health_check( 

1196 self.settings, 

1197 "The smallest natural input for this test is very " 

1198 "large. This makes it difficult for Hypothesis to generate " 

1199 "good inputs, especially when trying to shrink failing inputs." 

1200 "\n\n" 

1201 "Consider reducing the amount of data generated by the strategy. " 

1202 "Also consider introducing small alternative values for some " 

1203 "strategies. For example, could you " 

1204 "mark some arguments as optional by replacing `some_complex_strategy`" 

1205 "with `st.none() | some_complex_strategy`?" 

1206 "\n\n" 

1207 "If you are confident that the size of the smallest natural input " 

1208 "to your test cannot be reduced, you can suppress this health check " 

1209 "with @settings(suppress_health_check=[HealthCheck.large_base_example]). " 

1210 "See " 

1211 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

1212 "for details.", 

1213 HealthCheck.large_base_example, 

1214 ) 

1215 

1216 self.health_check_state = HealthCheckState() 

1217 

1218 # We attempt to use the size of the minimal generated test case starting 

1219 # from a given novel prefix as a guideline to generate smaller test 

1220 # cases for an initial period, by restriscting ourselves to test cases 

1221 # that are not much larger than it. 

1222 # 

1223 # Calculating the actual minimal generated test case is hard, so we 

1224 # take a best guess that zero extending a prefix produces the minimal 

1225 # test case starting with that prefix (this is true for our built in 

1226 # strategies). This is only a reasonable thing to do if the resulting 

1227 # test case is valid. If we regularly run into situations where it is 

1228 # not valid then this strategy is a waste of time, so we want to 

1229 # abandon it early. In order to do this we track how many times in a 

1230 # row it has failed to work, and abort small test case generation when 

1231 # it has failed too many times in a row. 

1232 consecutive_zero_extend_is_invalid = 0 

1233 

1234 # We control growth during initial example generation, for two 

1235 # reasons: 

1236 # 

1237 # * It gives us an opportunity to find small examples early, which 

1238 # gives us a fast path for easy to find bugs. 

1239 # * It avoids low probability events where we might end up 

1240 # generating very large examples during health checks, which 

1241 # on slower machines can trigger HealthCheck.too_slow. 

1242 # 

1243 # The heuristic we use is that we attempt to estimate the smallest 

1244 # extension of this prefix, and limit the size to no more than 

1245 # an order of magnitude larger than that. If we fail to estimate 

1246 # the size accurately, we skip over this prefix and try again. 

1247 # 

1248 # We need to tune the example size based on the initial prefix, 

1249 # because any fixed size might be too small, and any size based 

1250 # on the strategy in general can fall afoul of strategies that 

1251 # have very different sizes for different prefixes. 

1252 # 

1253 # We previously set a minimum value of 10 on small_example_cap, with the 

1254 # reasoning of avoiding flaky health checks. However, some users set a 

1255 # low max_examples for performance. A hard lower bound in this case biases 

1256 # the distribution towards small (and less powerful) examples. Flaky 

1257 # and loud health checks are better than silent performance degradation. 

1258 small_example_cap = min(self.settings.max_examples // 10, 50) 

1259 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10) 

1260 ran_optimisations = False 

1261 self._switch_to_hypothesis_provider = False 

1262 

1263 while self.should_generate_more(): 

1264 # we don't yet integrate DataTree with backends. Instead of generating 

1265 # a novel prefix, ask the backend for an input. 

1266 if not self.using_hypothesis_backend: 

1267 data = self.new_conjecture_data([]) 

1268 self.test_function(data) 

1269 continue 

1270 

1271 self._current_phase = "generate" 

1272 prefix = self.generate_novel_prefix() 

1273 if ( 

1274 self.valid_examples <= small_example_cap 

1275 and self.call_count <= 5 * small_example_cap 

1276 and not self.interesting_examples 

1277 and consecutive_zero_extend_is_invalid < 5 

1278 ): 

1279 minimal_example = self.cached_test_function( 

1280 prefix + (ChoiceTemplate("simplest", count=None),) 

1281 ) 

1282 

1283 if minimal_example.status < Status.VALID: 

1284 consecutive_zero_extend_is_invalid += 1 

1285 continue 

1286 # Because the Status code is greater than Status.VALID, it cannot be 

1287 # Status.OVERRUN, which guarantees that the minimal_example is a 

1288 # ConjectureResult object. 

1289 assert isinstance(minimal_example, ConjectureResult) 

1290 consecutive_zero_extend_is_invalid = 0 

1291 minimal_extension = len(minimal_example.choices) - len(prefix) 

1292 max_length = len(prefix) + minimal_extension * 5 

1293 

1294 # We could end up in a situation where even though the prefix was 

1295 # novel when we generated it, because we've now tried zero extending 

1296 # it not all possible continuations of it will be novel. In order to 

1297 # avoid making redundant test calls, we rerun it in simulation mode 

1298 # first. If this has a predictable result, then we don't bother 

1299 # running the test function for real here. If however we encounter 

1300 # some novel behaviour, we try again with the real test function, 

1301 # starting from the new novel prefix that has discovered. 

1302 trial_data = self.new_conjecture_data(prefix, max_choices=max_length) 

1303 try: 

1304 self.tree.simulate_test_function(trial_data) 

1305 continue 

1306 except PreviouslyUnseenBehaviour: 

1307 pass 

1308 

1309 # If the simulation entered part of the tree that has been killed, 

1310 # we don't want to run this. 

1311 assert isinstance(trial_data.observer, TreeRecordingObserver) 

1312 if trial_data.observer.killed: 

1313 continue 

1314 

1315 # We might have hit the cap on number of examples we should 

1316 # run when calculating the minimal example. 

1317 if not self.should_generate_more(): 

1318 break 

1319 

1320 prefix = trial_data.choices 

1321 else: 

1322 max_length = None 

1323 

1324 data = self.new_conjecture_data(prefix, max_choices=max_length) 

1325 self.test_function(data) 

1326 

1327 if ( 

1328 data.status is Status.OVERRUN 

1329 and max_length is not None 

1330 and "invalid because" not in data.events 

1331 ): 

1332 data.events["invalid because"] = ( 

1333 "reduced max size for early examples (avoids flaky health checks)" 

1334 ) 

1335 

1336 self.generate_mutations_from(data) 

1337 

1338 # Although the optimisations are logically a distinct phase, we 

1339 # actually normally run them as part of example generation. The 

1340 # reason for this is that we cannot guarantee that optimisation 

1341 # actually exhausts our budget: It might finish running and we 

1342 # discover that actually we still could run a bunch more test cases 

1343 # if we want. 

1344 if ( 

1345 self.valid_examples >= max(small_example_cap, optimise_at) 

1346 and not ran_optimisations 

1347 ): 

1348 ran_optimisations = True 

1349 self._current_phase = "target" 

1350 self.optimise_targets() 

1351 

1352 def generate_mutations_from(self, data: ConjectureData | ConjectureResult) -> None: 

1353 # A thing that is often useful but rarely happens by accident is 

1354 # to generate the same value at multiple different points in the 

1355 # test case. 

1356 # 

1357 # Rather than make this the responsibility of individual strategies 

1358 # we implement a small mutator that just takes parts of the test 

1359 # case with the same label and tries replacing one of them with a 

1360 # copy of the other and tries running it. If we've made a good 

1361 # guess about what to put where, this will run a similar generated 

1362 # test case with more duplication. 

1363 if ( 

1364 # An OVERRUN doesn't have enough information about the test 

1365 # case to mutate, so we just skip those. 

1366 data.status >= Status.INVALID 

1367 # This has a tendency to trigger some weird edge cases during 

1368 # generation so we don't let it run until we're done with the 

1369 # health checks. 

1370 and self.health_check_state is None 

1371 ): 

1372 initial_calls = self.call_count 

1373 failed_mutations = 0 

1374 

1375 while ( 

1376 self.should_generate_more() 

1377 # We implement fairly conservative checks for how long we 

1378 # we should run mutation for, as it's generally not obvious 

1379 # how helpful it is for any given test case. 

1380 and self.call_count <= initial_calls + 5 

1381 and failed_mutations <= 5 

1382 ): 

1383 groups = data.spans.mutator_groups 

1384 if not groups: 

1385 break 

1386 

1387 group = self.random.choice(groups) 

1388 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2) 

1389 if start1 > start2: 

1390 (start1, end1), (start2, end2) = (start2, end2), (start1, end1) 

1391 

1392 if ( 

1393 start1 <= start2 <= end2 <= end1 

1394 ): # pragma: no cover # flaky on conjecture-cover tests 

1395 # One span entirely contains the other. The strategy is very 

1396 # likely some kind of tree. e.g. we might have 

1397 # 

1398 # ┌─────┐ 

1399 # ┌─────┤ a ├──────┐ 

1400 # │ └─────┘ │ 

1401 # ┌──┴──┐ ┌──┴──┐ 

1402 # ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1403 # │ └──┬──┘ │ │ └──┬──┘ │ 

1404 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1405 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1406 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1407 # 

1408 # where each node is drawn from the same strategy and so 

1409 # has the same span label. We might have selected the spans 

1410 # corresponding to the a and c nodes, which is the entire 

1411 # tree and the subtree of (and including) c respectively. 

1412 # 

1413 # There are two possible mutations we could apply in this case: 

1414 # 1. replace a with c (replace child with parent) 

1415 # 2. replace c with a (replace parent with child) 

1416 # 

1417 # (1) results in multiple partial copies of the 

1418 # parent: 

1419 # ┌─────┐ 

1420 # ┌─────┤ a ├────────────┐ 

1421 # │ └─────┘ │ 

1422 # ┌──┴──┐ ┌─┴───┐ 

1423 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐ 

1424 # │ └──┬──┘ │ │ └─────┘ │ 

1425 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐ 

1426 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1427 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │ 

1428 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1429 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1430 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1431 # 

1432 # While (2) results in truncating part of the parent: 

1433 # 

1434 # ┌─────┐ 

1435 # ┌──┤ c ├──┐ 

1436 # │ └──┬──┘ │ 

1437 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1438 # │ g │ │ h │ │ i │ 

1439 # └───┘ └───┘ └───┘ 

1440 # 

1441 # (1) is the same as Example IV.4. in Nautilus (NDSS '19) 

1442 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf), 

1443 # except we do not repeat the replacement additional times 

1444 # (the paper repeats it once for a total of two copies). 

1445 # 

1446 # We currently only apply mutation (1), and ignore mutation 

1447 # (2). The reason is that the attempt generated from (2) is 

1448 # always something that Hypothesis could easily have generated 

1449 # itself, by simply not making various choices. Whereas 

1450 # duplicating the exact value + structure of particular choices 

1451 # in (1) would have been hard for Hypothesis to generate by 

1452 # chance. 

1453 # 

1454 # TODO: an extension of this mutation might repeat (1) on 

1455 # a geometric distribution between 0 and ~10 times. We would 

1456 # need to find the corresponding span to recurse on in the new 

1457 # choices, probably just by using the choices index. 

1458 

1459 # case (1): duplicate the choices in start1:start2. 

1460 attempt = data.choices[:start2] + data.choices[start1:] 

1461 else: 

1462 start, end = self.random.choice([(start1, end1), (start2, end2)]) 

1463 replacement = data.choices[start:end] 

1464 # We attempt to replace both the examples with 

1465 # whichever choice we made. Note that this might end 

1466 # up messing up and getting the example boundaries 

1467 # wrong - labels matching are only a best guess as to 

1468 # whether the two are equivalent - but it doesn't 

1469 # really matter. It may not achieve the desired result, 

1470 # but it's still a perfectly acceptable choice sequence 

1471 # to try. 

1472 attempt = ( 

1473 data.choices[:start1] 

1474 + replacement 

1475 + data.choices[end1:start2] 

1476 + replacement 

1477 + data.choices[end2:] 

1478 ) 

1479 

1480 try: 

1481 new_data = self.cached_test_function( 

1482 attempt, 

1483 # We set error_on_discard so that we don't end up 

1484 # entering parts of the tree we consider redundant 

1485 # and not worth exploring. 

1486 error_on_discard=True, 

1487 ) 

1488 except ContainsDiscard: 

1489 failed_mutations += 1 

1490 continue 

1491 

1492 if new_data is Overrun: 

1493 failed_mutations += 1 # pragma: no cover # annoying case 

1494 else: 

1495 assert isinstance(new_data, ConjectureResult) 

1496 if ( 

1497 new_data.status >= data.status 

1498 and choices_key(data.choices) != choices_key(new_data.choices) 

1499 and all( 

1500 k in new_data.target_observations 

1501 and new_data.target_observations[k] >= v 

1502 for k, v in data.target_observations.items() 

1503 ) 

1504 ): 

1505 data = new_data 

1506 failed_mutations = 0 

1507 else: 

1508 failed_mutations += 1 

1509 

1510 def optimise_targets(self) -> None: 

1511 """If any target observations have been made, attempt to optimise them 

1512 all.""" 

1513 if not self.should_optimise: 

1514 return 

1515 from hypothesis.internal.conjecture.optimiser import Optimiser 

1516 

1517 # We want to avoid running the optimiser for too long in case we hit 

1518 # an unbounded target score. We start this off fairly conservatively 

1519 # in case interesting examples are easy to find and then ramp it up 

1520 # on an exponential schedule so we don't hamper the optimiser too much 

1521 # if it needs a long time to find good enough improvements. 

1522 max_improvements = 10 

1523 while True: 

1524 prev_calls = self.call_count 

1525 

1526 any_improvements = False 

1527 

1528 for target, data in list(self.best_examples_of_observed_targets.items()): 

1529 optimiser = Optimiser( 

1530 self, data, target, max_improvements=max_improvements 

1531 ) 

1532 optimiser.run() 

1533 if optimiser.improvements > 0: 

1534 any_improvements = True 

1535 

1536 if self.interesting_examples: 

1537 break 

1538 

1539 max_improvements *= 2 

1540 

1541 if any_improvements: 

1542 continue 

1543 

1544 if self.best_observed_targets: 

1545 self.pareto_optimise() 

1546 

1547 if prev_calls == self.call_count: 

1548 break 

1549 

1550 def pareto_optimise(self) -> None: 

1551 if self.pareto_front is not None: 

1552 ParetoOptimiser(self).run() 

1553 

1554 def _run(self) -> None: 

1555 # have to use the primitive provider to interpret database bits... 

1556 self._switch_to_hypothesis_provider = True 

1557 with self._log_phase_statistics("reuse"): 

1558 self.reuse_existing_examples() 

1559 # Fast path for development: If the database gave us interesting 

1560 # examples from the previously stored primary key, don't try 

1561 # shrinking it again as it's unlikely to work. 

1562 if self.reused_previously_shrunk_test_case: 

1563 self.exit_with(ExitReason.finished) 

1564 # ...but we should use the supplied provider when generating... 

1565 self._switch_to_hypothesis_provider = False 

1566 with self._log_phase_statistics("generate"): 

1567 self.generate_new_examples() 

1568 # We normally run the targeting phase mixed in with the generate phase, 

1569 # but if we've been asked to run it but not generation then we have to 

1570 # run it explicitly on its own here. 

1571 if Phase.generate not in self.settings.phases: 

1572 self._current_phase = "target" 

1573 self.optimise_targets() 

1574 # ...and back to the primitive provider when shrinking. 

1575 self._switch_to_hypothesis_provider = True 

1576 with self._log_phase_statistics("shrink"): 

1577 self.shrink_interesting_examples() 

1578 self.exit_with(ExitReason.finished) 

1579 

1580 def new_conjecture_data( 

1581 self, 

1582 prefix: Sequence[ChoiceT | ChoiceTemplate], 

1583 *, 

1584 observer: DataObserver | None = None, 

1585 max_choices: int | None = None, 

1586 ) -> ConjectureData: 

1587 provider = ( 

1588 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider 

1589 ) 

1590 observer = observer or self.tree.new_observer() 

1591 if not self.using_hypothesis_backend: 

1592 observer = DataObserver() 

1593 

1594 return ConjectureData( 

1595 prefix=prefix, 

1596 observer=observer, 

1597 provider=provider, 

1598 max_choices=max_choices, 

1599 random=self.random, 

1600 ) 

1601 

1602 def shrink_interesting_examples(self) -> None: 

1603 """If we've found interesting examples, try to replace each of them 

1604 with a minimal interesting example with the same interesting_origin. 

1605 

1606 We may find one or more examples with a new interesting_origin 

1607 during the shrink process. If so we shrink these too. 

1608 """ 

1609 if Phase.shrink not in self.settings.phases or not self.interesting_examples: 

1610 return 

1611 

1612 self.debug("Shrinking interesting examples") 

1613 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS 

1614 

1615 for prev_data in sorted( 

1616 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes) 

1617 ): 

1618 assert prev_data.status == Status.INTERESTING 

1619 data = self.new_conjecture_data(prev_data.choices) 

1620 self.test_function(data) 

1621 if data.status != Status.INTERESTING: 

1622 self.exit_with(ExitReason.flaky) 

1623 

1624 self.clear_secondary_key() 

1625 

1626 while len(self.shrunk_examples) < len(self.interesting_examples): 

1627 target, example = min( 

1628 ( 

1629 (k, v) 

1630 for k, v in self.interesting_examples.items() 

1631 if k not in self.shrunk_examples 

1632 ), 

1633 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))), 

1634 ) 

1635 self.debug(f"Shrinking {target!r}: {example.choices}") 

1636 

1637 if not self.settings.report_multiple_bugs: 

1638 # If multi-bug reporting is disabled, we shrink our currently-minimal 

1639 # failure, allowing 'slips' to any bug with a smaller minimal example. 

1640 self.shrink(example, lambda d: d.status == Status.INTERESTING) 

1641 return 

1642 

1643 def predicate(d: ConjectureResult | _Overrun) -> bool: 

1644 if d.status < Status.INTERESTING: 

1645 return False 

1646 d = cast(ConjectureResult, d) 

1647 return d.interesting_origin == target 

1648 

1649 self.shrink(example, predicate) 

1650 

1651 self.shrunk_examples.add(target) 

1652 

1653 def clear_secondary_key(self) -> None: 

1654 if self.has_existing_examples(): 

1655 # If we have any smaller examples in the secondary corpus, now is 

1656 # a good time to try them to see if they work as shrinks. They 

1657 # probably won't, but it's worth a shot and gives us a good 

1658 # opportunity to clear out the database. 

1659 

1660 # It's not worth trying the primary corpus because we already 

1661 # tried all of those in the initial phase. 

1662 corpus = sorted( 

1663 self.settings.database.fetch(self.secondary_key), key=shortlex 

1664 ) 

1665 for c in corpus: 

1666 choices = choices_from_bytes(c) 

1667 if choices is None: 

1668 self.settings.database.delete(self.secondary_key, c) 

1669 continue 

1670 primary = { 

1671 choices_to_bytes(v.choices) 

1672 for v in self.interesting_examples.values() 

1673 } 

1674 if shortlex(c) > max(map(shortlex, primary)): 

1675 break 

1676 

1677 self.cached_test_function(choices) 

1678 # We unconditionally remove c from the secondary key as it 

1679 # is either now primary or worse than our primary example 

1680 # of this reason for interestingness. 

1681 self.settings.database.delete(self.secondary_key, c) 

1682 

1683 def shrink( 

1684 self, 

1685 example: ConjectureData | ConjectureResult, 

1686 predicate: ShrinkPredicateT | None = None, 

1687 allow_transition: ( 

1688 Callable[[ConjectureData | ConjectureResult, ConjectureData], bool] | None 

1689 ) = None, 

1690 ) -> ConjectureData | ConjectureResult: 

1691 s = self.new_shrinker(example, predicate, allow_transition) 

1692 s.shrink() 

1693 return s.shrink_target 

1694 

1695 def new_shrinker( 

1696 self, 

1697 example: ConjectureData | ConjectureResult, 

1698 predicate: ShrinkPredicateT | None = None, 

1699 allow_transition: ( 

1700 Callable[[ConjectureData | ConjectureResult, ConjectureData], bool] | None 

1701 ) = None, 

1702 ) -> Shrinker: 

1703 return Shrinker( 

1704 self, 

1705 example, 

1706 predicate, 

1707 allow_transition=allow_transition, 

1708 explain=Phase.explain in self.settings.phases, 

1709 in_target_phase=self._current_phase == "target", 

1710 ) 

1711 

1712 def passing_choice_sequences( 

1713 self, prefix: Sequence[ChoiceNode] = () 

1714 ) -> frozenset[tuple[ChoiceNode, ...]]: 

1715 """Return a collection of choice sequence nodes which cause the test to pass. 

1716 Optionally restrict this by a certain prefix, which is useful for explain mode. 

1717 """ 

1718 return frozenset( 

1719 cast(ConjectureResult, result).nodes 

1720 for key in self.__data_cache 

1721 if (result := self.__data_cache[key]).status is Status.VALID 

1722 and startswith(cast(ConjectureResult, result).nodes, prefix) 

1723 ) 

1724 

1725 

1726class ContainsDiscard(Exception): 

1727 pass