Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/engine.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

725 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import importlib 

12import math 

13import threading 

14import time 

15from collections import defaultdict 

16from collections.abc import Callable, Generator, Sequence 

17from contextlib import AbstractContextManager, contextmanager, nullcontext 

18from dataclasses import dataclass, field 

19from datetime import timedelta 

20from enum import Enum 

21from random import Random 

22from typing import Literal, NoReturn, cast 

23 

24from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings 

25from hypothesis._settings import local_settings 

26from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes 

27from hypothesis.errors import ( 

28 BackendCannotProceed, 

29 FlakyBackendFailure, 

30 HypothesisException, 

31 InvalidArgument, 

32 StopTest, 

33) 

34from hypothesis.internal.cache import LRUReusedCache 

35from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override 

36from hypothesis.internal.conjecture.choice import ( 

37 ChoiceConstraintsT, 

38 ChoiceKeyT, 

39 ChoiceNode, 

40 ChoiceT, 

41 ChoiceTemplate, 

42 choices_key, 

43) 

44from hypothesis.internal.conjecture.data import ( 

45 ConjectureData, 

46 ConjectureResult, 

47 DataObserver, 

48 Overrun, 

49 Status, 

50 _Overrun, 

51) 

52from hypothesis.internal.conjecture.datatree import ( 

53 DataTree, 

54 PreviouslyUnseenBehaviour, 

55 TreeRecordingObserver, 

56) 

57from hypothesis.internal.conjecture.junkdrawer import ( 

58 ensure_free_stackframes, 

59 startswith, 

60) 

61from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser 

62from hypothesis.internal.conjecture.providers import ( 

63 AVAILABLE_PROVIDERS, 

64 HypothesisProvider, 

65 PrimitiveProvider, 

66) 

67from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key 

68from hypothesis.internal.escalation import InterestingOrigin 

69from hypothesis.internal.healthcheck import fail_health_check 

70from hypothesis.internal.observability import Observation, with_observability_callback 

71from hypothesis.reporting import base_report, report, verbose_report 

72 

73# In most cases, the following constants are all Final. However, we do allow users 

74# to monkeypatch all of these variables, which means we cannot annotate them as 

75# Final or mypyc will inline them and render monkeypatching useless. 

76 

77#: The maximum number of times the shrinker will reduce the complexity of a failing 

78#: input before giving up. This avoids falling down a trap of exponential (or worse) 

79#: complexity, where the shrinker appears to be making progress but will take a 

80#: substantially long time to finish completely. 

81MAX_SHRINKS: int = 500 

82 

83# If the shrinking phase takes more than five minutes, abort it early and print 

84# a warning. Many CI systems will kill a build after around ten minutes with 

85# no output, and appearing to hang isn't great for interactive use either - 

86# showing partially-shrunk examples is better than quitting with no examples! 

87# (but make it monkeypatchable, for the rare users who need to keep on shrinking) 

88 

89#: The maximum total time in seconds that the shrinker will try to shrink a failure 

90#: for before giving up. This is across all shrinks for the same failure, so even 

91#: if the shrinker successfully reduces the complexity of a single failure several 

92#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken. 

93MAX_SHRINKING_SECONDS: int = 300 

94 

95#: The maximum amount of entropy a single test case can use before giving up 

96#: while making random choices during input generation. 

97#: 

98#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you 

99#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly 

100#: increase the amount of entropy a test case can use during generation. 

101BUFFER_SIZE: int = 8 * 1024 

102CACHE_SIZE: int = 10000 

103MIN_TEST_CALLS: int = 10 

104 

105# we use this to isolate Hypothesis from interacting with the global random, 

106# to make it easier to reason about our global random warning logic easier (see 

107# deprecate_random_in_strategy). 

108_random = Random() 

109 

110 

111def shortlex(s): 

112 return (len(s), s) 

113 

114 

115@dataclass(slots=True, frozen=False) 

116class HealthCheckState: 

117 valid_examples: int = field(default=0) 

118 invalid_examples: int = field(default=0) 

119 overrun_examples: int = field(default=0) 

120 draw_times: defaultdict[str, list[float]] = field( 

121 default_factory=lambda: defaultdict(list) 

122 ) 

123 

124 @property 

125 def total_draw_time(self) -> float: 

126 return math.fsum(sum(self.draw_times.values(), start=[])) 

127 

128 def timing_report(self) -> str: 

129 """Return a terminal report describing what was slow.""" 

130 if not self.draw_times: 

131 return "" 

132 width = max( 

133 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times 

134 ) 

135 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"] 

136 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1])) 

137 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch 

138 # If we have very many unique keys, which can happen due to interactive 

139 # draws with computed labels, we'll skip uninformative rows. 

140 if ( 

141 5 <= i < (len(self.draw_times) - 2) 

142 and math.fsum(times) * 20 < self.total_draw_time 

143 ): 

144 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)") 

145 break 

146 # Compute the row to report, omitting times <1ms to focus on slow draws 

147 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4] 

148 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",") 

149 arg = argname.removeprefix("generate:").removesuffix(": ") 

150 out.append( 

151 f" {arg:^{width}} | {len(times):>4} | " 

152 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}" 

153 ) 

154 return "\n".join(out) 

155 

156 

157def _invalid_thresholds(*, r: float, c: float) -> tuple[int, int]: 

158 base = math.ceil(math.log(1 - c) / math.log(1 - r)) - 1 

159 per_p = math.ceil(1 / r) 

160 return base, per_p 

161 

162 

163# stop once we're 99% confident the true valid rate is below 1%. See 

164# https://github.com/HypothesisWorks/hypothesis/issues/4623#issuecomment-3814681997 

165# for how we derived this formula. 

166INVALID_THRESHOLD_BASE, INVALID_PER_VALID = _invalid_thresholds(r=0.01, c=0.99) 

167 

168 

169class ExitReason(Enum): 

170 max_examples = "settings.max_examples={s.max_examples}" 

171 max_iterations = ( 

172 "settings.max_examples={s.max_examples}, " 

173 "but < 1% of examples satisfied assumptions" 

174 ) 

175 max_shrinks = f"shrunk example {MAX_SHRINKS} times" 

176 finished = "nothing left to do" 

177 flaky = "test was flaky" 

178 very_slow_shrinking = "shrinking was very slow" 

179 

180 def describe(self, settings: Settings) -> str: 

181 return self.value.format(s=settings) 

182 

183 

184class RunIsComplete(Exception): 

185 pass 

186 

187 

188def _get_provider(backend: str) -> PrimitiveProvider | type[PrimitiveProvider]: 

189 provider_cls = AVAILABLE_PROVIDERS[backend] 

190 if isinstance(provider_cls, str): 

191 module_name, class_name = provider_cls.rsplit(".", 1) 

192 provider_cls = getattr(importlib.import_module(module_name), class_name) 

193 

194 if provider_cls.lifetime == "test_function": 

195 return provider_cls(None) 

196 elif provider_cls.lifetime == "test_case": 

197 return provider_cls 

198 else: 

199 raise InvalidArgument( 

200 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. " 

201 "Expected one of 'test_function', 'test_case'." 

202 ) 

203 

204 

205class CallStats(TypedDict): 

206 status: str 

207 runtime: float 

208 drawtime: float 

209 gctime: float 

210 events: list[str] 

211 

212 

213PhaseStatistics = TypedDict( 

214 "PhaseStatistics", 

215 { 

216 "duration-seconds": float, 

217 "test-cases": list[CallStats], 

218 "distinct-failures": int, 

219 "shrinks-successful": int, 

220 }, 

221) 

222StatisticsDict = TypedDict( 

223 "StatisticsDict", 

224 { 

225 "generate-phase": NotRequired[PhaseStatistics], 

226 "reuse-phase": NotRequired[PhaseStatistics], 

227 "shrink-phase": NotRequired[PhaseStatistics], 

228 "stopped-because": NotRequired[str], 

229 "targets": NotRequired[dict[str, float]], 

230 "nodeid": NotRequired[str], 

231 }, 

232) 

233 

234 

235def choice_count(choices: Sequence[ChoiceT | ChoiceTemplate]) -> int | None: 

236 count = 0 

237 for choice in choices: 

238 if isinstance(choice, ChoiceTemplate): 

239 if choice.count is None: 

240 return None 

241 count += choice.count 

242 else: 

243 count += 1 

244 return count 

245 

246 

247class DiscardObserver(DataObserver): 

248 @override 

249 def kill_branch(self) -> NoReturn: 

250 raise ContainsDiscard 

251 

252 

253def realize_choices(data: ConjectureData, *, for_failure: bool) -> None: 

254 for node in data.nodes: 

255 value = data.provider.realize(node.value, for_failure=for_failure) 

256 expected_type = { 

257 "string": str, 

258 "float": float, 

259 "integer": int, 

260 "boolean": bool, 

261 "bytes": bytes, 

262 }[node.type] 

263 if type(value) is not expected_type: 

264 raise HypothesisException( 

265 f"expected {expected_type} from " 

266 f"{data.provider.realize.__qualname__}, got {type(value)}" 

267 ) 

268 

269 constraints = cast( 

270 ChoiceConstraintsT, 

271 { 

272 k: data.provider.realize(v, for_failure=for_failure) 

273 for k, v in node.constraints.items() 

274 }, 

275 ) 

276 node.value = value 

277 node.constraints = constraints 

278 

279 

280class ConjectureRunner: 

281 def __init__( 

282 self, 

283 test_function: Callable[[ConjectureData], None], 

284 *, 

285 settings: Settings | None = None, 

286 random: Random | None = None, 

287 database_key: bytes | None = None, 

288 ignore_limits: bool = False, 

289 thread_overlap: dict[int, bool] | None = None, 

290 ) -> None: 

291 self._test_function: Callable[[ConjectureData], None] = test_function 

292 self.settings: Settings = settings or Settings() 

293 self.shrinks: int = 0 

294 self.finish_shrinking_deadline: float | None = None 

295 self.call_count: int = 0 

296 self.misaligned_count: int = 0 

297 self.valid_examples: int = 0 

298 self.invalid_examples: int = 0 

299 self.overrun_examples: int = 0 

300 self.random: Random = random or Random(_random.getrandbits(128)) 

301 self.database_key: bytes | None = database_key 

302 self.ignore_limits: bool = ignore_limits 

303 self.thread_overlap = {} if thread_overlap is None else thread_overlap 

304 

305 # Global dict of per-phase statistics, and a list of per-call stats 

306 # which transfer to the global dict at the end of each phase. 

307 self._current_phase: str = "(not a phase)" 

308 self.statistics: StatisticsDict = {} 

309 self.stats_per_test_case: list[CallStats] = [] 

310 

311 self.interesting_examples: dict[InterestingOrigin, ConjectureResult] = {} 

312 # We use call_count because there may be few possible valid_examples. 

313 self.first_bug_found_at: int | None = None 

314 self.last_bug_found_at: int | None = None 

315 self.first_bug_found_time: float = math.inf 

316 

317 self.shrunk_examples: set[InterestingOrigin] = set() 

318 self.health_check_state: HealthCheckState | None = None 

319 self.tree: DataTree = DataTree() 

320 self.provider: PrimitiveProvider | type[PrimitiveProvider] = _get_provider( 

321 self.settings.backend 

322 ) 

323 

324 self.best_observed_targets: defaultdict[str, float] = defaultdict( 

325 lambda: NO_SCORE 

326 ) 

327 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {} 

328 

329 # We keep the pareto front in the example database if we have one. This 

330 # is only marginally useful at present, but speeds up local development 

331 # because it means that large targets will be quickly surfaced in your 

332 # testing. 

333 self.pareto_front: ParetoFront | None = None 

334 if self.database_key is not None and self.settings.database is not None: 

335 self.pareto_front = ParetoFront(self.random) 

336 self.pareto_front.on_evict(self.on_pareto_evict) 

337 

338 # We want to be able to get the ConjectureData object that results 

339 # from running a choice sequence without recalculating, especially during 

340 # shrinking where we need to know about the structure of the 

341 # executed test case. 

342 self.__data_cache = LRUReusedCache[ 

343 tuple[ChoiceKeyT, ...], ConjectureResult | _Overrun 

344 ](CACHE_SIZE) 

345 

346 self.reused_previously_shrunk_test_case: bool = False 

347 

348 self.__pending_call_explanation: str | None = None 

349 self._backend_found_failure: bool = False 

350 self._backend_exceeded_deadline: bool = False 

351 self._backend_discard_count: int = 0 

352 # note unsound verification by alt backends 

353 self._verified_by_backend: str | None = None 

354 self._switch_to_hypothesis_provider: bool = False 

355 

356 @contextmanager 

357 def _with_switch_to_hypothesis_provider( 

358 self, value: bool 

359 ) -> Generator[None, None, None]: 

360 previous = self._switch_to_hypothesis_provider 

361 try: 

362 self._switch_to_hypothesis_provider = value 

363 yield 

364 finally: 

365 self._switch_to_hypothesis_provider = previous 

366 

367 @property 

368 def using_hypothesis_backend(self) -> bool: 

369 return ( 

370 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider 

371 ) 

372 

373 def explain_next_call_as(self, explanation: str) -> None: 

374 self.__pending_call_explanation = explanation 

375 

376 def clear_call_explanation(self) -> None: 

377 self.__pending_call_explanation = None 

378 

379 @contextmanager 

380 def _log_phase_statistics( 

381 self, phase: Literal["reuse", "generate", "shrink"] 

382 ) -> Generator[None, None, None]: 

383 self.stats_per_test_case.clear() 

384 start_time = time.perf_counter() 

385 try: 

386 self._current_phase = phase 

387 yield 

388 finally: 

389 self.statistics[phase + "-phase"] = { # type: ignore 

390 "duration-seconds": time.perf_counter() - start_time, 

391 "test-cases": list(self.stats_per_test_case), 

392 "distinct-failures": len(self.interesting_examples), 

393 "shrinks-successful": self.shrinks, 

394 } 

395 

396 @property 

397 def should_optimise(self) -> bool: 

398 return Phase.target in self.settings.phases 

399 

400 def __tree_is_exhausted(self) -> bool: 

401 return self.tree.is_exhausted and self.using_hypothesis_backend 

402 

403 def __stoppable_test_function(self, data: ConjectureData) -> None: 

404 """Run ``self._test_function``, but convert a ``StopTest`` exception 

405 into a normal return and avoid raising anything flaky for RecursionErrors. 

406 """ 

407 # We ensure that the test has this much stack space remaining, no 

408 # matter the size of the stack when called, to de-flake RecursionErrors 

409 # (#2494, #3671). Note, this covers the data generation part of the test; 

410 # the actual test execution is additionally protected at the call site 

411 # in hypothesis.core.execute_once. 

412 with ensure_free_stackframes(): 

413 try: 

414 self._test_function(data) 

415 except StopTest as e: 

416 if e.testcounter == data.testcounter: 

417 # This StopTest has successfully stopped its test, and can now 

418 # be discarded. 

419 pass 

420 else: 

421 # This StopTest was raised by a different ConjectureData. We 

422 # need to re-raise it so that it will eventually reach the 

423 # correct engine. 

424 raise 

425 

426 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]: 

427 return choices_key(choices) 

428 

429 def _cache(self, data: ConjectureData) -> None: 

430 result = data.as_result() 

431 key = self._cache_key(data.choices) 

432 self.__data_cache[key] = result 

433 

434 def cached_test_function( 

435 self, 

436 choices: Sequence[ChoiceT | ChoiceTemplate], 

437 *, 

438 error_on_discard: bool = False, 

439 extend: int | Literal["full"] = 0, 

440 ) -> ConjectureResult | _Overrun: 

441 """ 

442 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard`` 

443 in preference to running the actual test function. This is to allow us 

444 to skip test cases we expect to be redundant in some cases. Note that 

445 it may be the case that we don't raise ``ContainsDiscard`` even if the 

446 result has discards if we cannot determine from previous runs whether 

447 it will have a discard. 

448 """ 

449 # node templates represent a not-yet-filled hole and therefore cannot 

450 # be cached or retrieved from the cache. 

451 if not any(isinstance(choice, ChoiceTemplate) for choice in choices): 

452 # this type cast is validated by the isinstance check above (ie, there 

453 # are no ChoiceTemplate elements). 

454 choices = cast(Sequence[ChoiceT], choices) 

455 key = self._cache_key(choices) 

456 try: 

457 cached = self.__data_cache[key] 

458 # if we have a cached overrun for this key, but we're allowing extensions 

459 # of the nodes, it could in fact run to a valid data if we try. 

460 if extend == 0 or cached.status is not Status.OVERRUN: 

461 return cached 

462 except KeyError: 

463 pass 

464 

465 if extend == "full": 

466 max_length = None 

467 elif (count := choice_count(choices)) is None: 

468 max_length = None 

469 else: 

470 max_length = count + extend 

471 

472 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver. 

473 # The reason is we don't expect simulate_test_function to explore new choices 

474 # and write back to the tree, so we don't want the overhead of the 

475 # TreeRecordingObserver tracking those calls. 

476 trial_observer: DataObserver | None = DataObserver() 

477 if error_on_discard: 

478 trial_observer = DiscardObserver() 

479 

480 try: 

481 trial_data = self.new_conjecture_data( 

482 choices, observer=trial_observer, max_choices=max_length 

483 ) 

484 self.tree.simulate_test_function(trial_data) 

485 except PreviouslyUnseenBehaviour: 

486 pass 

487 else: 

488 trial_data.freeze() 

489 key = self._cache_key(trial_data.choices) 

490 if trial_data.status > Status.OVERRUN: 

491 try: 

492 return self.__data_cache[key] 

493 except KeyError: 

494 pass 

495 else: 

496 # if we simulated to an overrun, then we our result is certainly 

497 # an overrun; no need to consult the cache. (and we store this result 

498 # for simulation-less lookup later). 

499 self.__data_cache[key] = Overrun 

500 return Overrun 

501 try: 

502 return self.__data_cache[key] 

503 except KeyError: 

504 pass 

505 

506 data = self.new_conjecture_data(choices, max_choices=max_length) 

507 # note that calling test_function caches `data` for us. 

508 self.test_function(data) 

509 return data.as_result() 

510 

511 def test_function(self, data: ConjectureData) -> None: 

512 if self.__pending_call_explanation is not None: 

513 self.debug(self.__pending_call_explanation) 

514 self.__pending_call_explanation = None 

515 

516 self.call_count += 1 

517 interrupted = False 

518 

519 def _backend_cannot_proceed( 

520 exc: BackendCannotProceed, data: ConjectureData 

521 ) -> None: 

522 if exc.scope in ("verified", "exhausted"): 

523 self._switch_to_hypothesis_provider = True 

524 if exc.scope == "verified": 

525 self._verified_by_backend = self.settings.backend 

526 elif exc.scope == "discard_test_case": 

527 self._backend_discard_count += 1 

528 if ( 

529 self._backend_discard_count > 10 

530 and (self._backend_discard_count / self.call_count) > 0.2 

531 ): 

532 verbose_report( 

533 f"Switching away from backend {self.settings.backend!r} " 

534 "to the Hypothesis backend, " 

535 f"because {self._backend_discard_count} of {self.call_count} " 

536 "attempted test cases " 

537 f"({self._backend_discard_count / self.call_count * 100:0.1f}%) " 

538 f"were discarded by backend {self.settings.backend!r}" 

539 ) 

540 self._switch_to_hypothesis_provider = True 

541 

542 # treat all BackendCannotProceed exceptions as invalid. This isn't 

543 # great; "verified" should really be counted as self.valid_examples += 1. 

544 # But we check self.valid_examples == 0 to determine whether to raise 

545 # Unsatisfiable, and that would throw this check off. 

546 self.invalid_examples += 1 

547 data.cannot_proceed_scope = exc.scope 

548 

549 # this fiddly bit of control flow is to work around `return` being 

550 # disallowed in `finally` blocks as of python 3.14. Otherwise, we would 

551 # just return in the _backend_cannot_proceed branch. 

552 finally_early_return = False 

553 

554 try: 

555 self.__stoppable_test_function(data) 

556 except KeyboardInterrupt: 

557 interrupted = True 

558 raise 

559 except BackendCannotProceed as exc: 

560 _backend_cannot_proceed(exc, data) 

561 # skip the post-test-case tracking; we're pretending this never happened 

562 interrupted = True 

563 data.freeze() 

564 return 

565 except BaseException: 

566 data.freeze() 

567 if self.settings.backend != "hypothesis": 

568 try: 

569 realize_choices(data, for_failure=True) 

570 except BackendCannotProceed as exc: 

571 _backend_cannot_proceed(exc, data) 

572 # skip the post-test-case tracking; we're pretending this 

573 # never happened 

574 interrupted = True 

575 return 

576 self.save_choices(data.choices) 

577 raise 

578 finally: 

579 # No branch, because if we're interrupted we always raise 

580 # the KeyboardInterrupt, never continue to the code below. 

581 if not interrupted: # pragma: no branch 

582 assert data.cannot_proceed_scope is None 

583 data.freeze() 

584 

585 if self.settings.backend != "hypothesis": 

586 try: 

587 realize_choices( 

588 data, for_failure=data.status is Status.INTERESTING 

589 ) 

590 except BackendCannotProceed as exc: 

591 _backend_cannot_proceed(exc, data) 

592 finally_early_return = True 

593 

594 if not finally_early_return: 

595 call_stats: CallStats = { 

596 "status": data.status.name.lower(), 

597 "runtime": data.finish_time - data.start_time, 

598 "drawtime": math.fsum(data.draw_times.values()), 

599 "gctime": data.gc_finish_time - data.gc_start_time, 

600 "events": sorted( 

601 k if v == "" else f"{k}: {v}" 

602 for k, v in data.events.items() 

603 ), 

604 } 

605 self.stats_per_test_case.append(call_stats) 

606 

607 self._cache(data) 

608 if ( 

609 data.misaligned_at is not None 

610 ): # pragma: no branch # coverage bug? 

611 self.misaligned_count += 1 

612 

613 if finally_early_return: 

614 return 

615 

616 self.debug_data(data) 

617 

618 if ( 

619 data.target_observations 

620 and self.pareto_front is not None 

621 and self.pareto_front.add(data.as_result()) 

622 ): 

623 self.save_choices(data.choices, sub_key=b"pareto") 

624 

625 if data.status >= Status.VALID: 

626 for k, v in data.target_observations.items(): 

627 self.best_observed_targets[k] = max(self.best_observed_targets[k], v) 

628 

629 if k not in self.best_examples_of_observed_targets: 

630 data_as_result = data.as_result() 

631 assert not isinstance(data_as_result, _Overrun) 

632 self.best_examples_of_observed_targets[k] = data_as_result 

633 continue 

634 

635 existing_example = self.best_examples_of_observed_targets[k] 

636 existing_score = existing_example.target_observations[k] 

637 

638 if v < existing_score: 

639 continue 

640 

641 if v > existing_score or sort_key(data.nodes) < sort_key( 

642 existing_example.nodes 

643 ): 

644 data_as_result = data.as_result() 

645 assert not isinstance(data_as_result, _Overrun) 

646 self.best_examples_of_observed_targets[k] = data_as_result 

647 

648 if data.status is Status.VALID: 

649 self.valid_examples += 1 

650 if data.status is Status.INVALID: 

651 self.invalid_examples += 1 

652 if data.status is Status.OVERRUN: 

653 self.overrun_examples += 1 

654 

655 if data.status == Status.INTERESTING: 

656 if not self.using_hypothesis_backend: 

657 # replay this failure on the hypothesis backend to ensure it still 

658 # finds a failure. otherwise, it is flaky. 

659 initial_exception = data.expected_exception 

660 data = ConjectureData.for_choices(data.choices) 

661 # we've already going to use the hypothesis provider for this 

662 # data, so the verb "switch" is a bit misleading here. We're really 

663 # setting this to inform our on_observation logic that the observation 

664 # generated here was from a hypothesis backend, and shouldn't be 

665 # sent to the on_observation of any alternative backend. 

666 with self._with_switch_to_hypothesis_provider(True): 

667 self.__stoppable_test_function(data) 

668 data.freeze() 

669 # TODO: Should same-origin also be checked? (discussion in 

670 # https://github.com/HypothesisWorks/hypothesis/pull/4470#discussion_r2217055487) 

671 if data.status != Status.INTERESTING: 

672 desc_new_status = { 

673 data.status.VALID: "passed", 

674 data.status.INVALID: "failed filters", 

675 data.status.OVERRUN: "overran", 

676 }[data.status] 

677 raise FlakyBackendFailure( 

678 f"Inconsistent results from replaying a failing test case! " 

679 f"Raised {type(initial_exception).__name__} on " 

680 f"backend={self.settings.backend!r}, but " 

681 f"{desc_new_status} under backend='hypothesis'.", 

682 [initial_exception], 

683 ) 

684 

685 self._cache(data) 

686 

687 assert data.interesting_origin is not None 

688 key = data.interesting_origin 

689 changed = False 

690 try: 

691 existing = self.interesting_examples[key] 

692 except KeyError: 

693 changed = True 

694 self.last_bug_found_at = self.call_count 

695 if self.first_bug_found_at is None: 

696 self.first_bug_found_at = self.call_count 

697 self.first_bug_found_time = time.monotonic() 

698 else: 

699 if sort_key(data.nodes) < sort_key(existing.nodes): 

700 self.shrinks += 1 

701 self.downgrade_choices(existing.choices) 

702 self.__data_cache.unpin(self._cache_key(existing.choices)) 

703 changed = True 

704 

705 if changed: 

706 self.save_choices(data.choices) 

707 self.interesting_examples[key] = data.as_result() # type: ignore 

708 if not self.using_hypothesis_backend: 

709 self._backend_found_failure = True 

710 self.__data_cache.pin(self._cache_key(data.choices), data.as_result()) 

711 self.shrunk_examples.discard(key) 

712 

713 if self.shrinks >= MAX_SHRINKS: 

714 self.exit_with(ExitReason.max_shrinks) 

715 

716 if ( 

717 not self.ignore_limits 

718 and self.finish_shrinking_deadline is not None 

719 and self.finish_shrinking_deadline < time.perf_counter() 

720 ): 

721 # See https://github.com/HypothesisWorks/hypothesis/issues/2340 

722 report( 

723 "WARNING: Hypothesis has spent more than five minutes working to shrink" 

724 " a failing example, and stopped because it is making very slow" 

725 " progress. When you re-run your tests, shrinking will resume and may" 

726 " take this long before aborting again.\nPLEASE REPORT THIS if you can" 

727 " provide a reproducing example, so that we can improve shrinking" 

728 " performance for everyone." 

729 ) 

730 self.exit_with(ExitReason.very_slow_shrinking) 

731 

732 if not self.interesting_examples: 

733 # Note that this logic is reproduced to end the generation phase when 

734 # we have interesting examples. Update that too if you change this! 

735 # (The doubled implementation is because here we exit the engine entirely, 

736 # while in the other case below we just want to move on to shrinking.) 

737 if self.valid_examples >= self.settings.max_examples: 

738 self.exit_with(ExitReason.max_examples) 

739 if (self.invalid_examples + self.overrun_examples) > ( 

740 INVALID_THRESHOLD_BASE + INVALID_PER_VALID * self.valid_examples 

741 ): 

742 self.exit_with(ExitReason.max_iterations) 

743 

744 if self.__tree_is_exhausted(): 

745 self.exit_with(ExitReason.finished) 

746 

747 self.record_for_health_check(data) 

748 

749 def on_pareto_evict(self, data: ConjectureResult) -> None: 

750 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices)) 

751 

752 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]: 

753 """Uses the tree to proactively generate a starting choice sequence 

754 that we haven't explored yet for this test. 

755 

756 When this method is called, we assume that there must be at 

757 least one novel prefix left to find. If there were not, then the 

758 test run should have already stopped due to tree exhaustion. 

759 """ 

760 return self.tree.generate_novel_prefix(self.random) 

761 

762 def record_for_health_check(self, data: ConjectureData) -> None: 

763 # Once we've actually found a bug, there's no point in trying to run 

764 # health checks - they'll just mask the actually important information. 

765 if data.status == Status.INTERESTING: 

766 self.health_check_state = None 

767 

768 state = self.health_check_state 

769 

770 if state is None: 

771 return 

772 

773 for k, v in data.draw_times.items(): 

774 state.draw_times[k].append(v) 

775 

776 if data.status == Status.VALID: 

777 state.valid_examples += 1 

778 elif data.status == Status.INVALID: 

779 state.invalid_examples += 1 

780 else: 

781 assert data.status == Status.OVERRUN 

782 state.overrun_examples += 1 

783 

784 max_valid_draws = 10 

785 max_invalid_draws = 50 

786 max_overrun_draws = 20 

787 

788 assert state.valid_examples <= max_valid_draws 

789 

790 if state.valid_examples == max_valid_draws: 

791 self.health_check_state = None 

792 return 

793 

794 if state.overrun_examples == max_overrun_draws: 

795 fail_health_check( 

796 self.settings, 

797 "Generated inputs routinely consumed more than the maximum " 

798 f"allowed entropy: {state.valid_examples} inputs were generated " 

799 f"successfully, while {state.overrun_examples} inputs exceeded the " 

800 f"maximum allowed entropy during generation." 

801 "\n\n" 

802 f"Testing with inputs this large tends to be slow, and to produce " 

803 "failures that are both difficult to shrink and difficult to understand. " 

804 "Try decreasing the amount of data generated, for example by " 

805 "decreasing the minimum size of collection strategies like " 

806 "st.lists()." 

807 "\n\n" 

808 "If you expect the average size of your input to be this large, " 

809 "you can disable this health check with " 

810 "@settings(suppress_health_check=[HealthCheck.data_too_large]). " 

811 "See " 

812 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

813 "for details.", 

814 HealthCheck.data_too_large, 

815 ) 

816 if state.invalid_examples == max_invalid_draws: 

817 fail_health_check( 

818 self.settings, 

819 "It looks like this test is filtering out a lot of inputs. " 

820 f"{state.valid_examples} inputs were generated successfully, " 

821 f"while {state.invalid_examples} inputs were filtered out. " 

822 "\n\n" 

823 "An input might be filtered out by calls to assume(), " 

824 "strategy.filter(...), or occasionally by Hypothesis internals." 

825 "\n\n" 

826 "Applying this much filtering makes input generation slow, since " 

827 "Hypothesis must discard inputs which are filtered out and try " 

828 "generating it again. It is also possible that applying this much " 

829 "filtering will distort the domain and/or distribution of the test, " 

830 "leaving your testing less rigorous than expected." 

831 "\n\n" 

832 "If you expect this many inputs to be filtered out during generation, " 

833 "you can disable this health check with " 

834 "@settings(suppress_health_check=[HealthCheck.filter_too_much]). See " 

835 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

836 "for details.", 

837 HealthCheck.filter_too_much, 

838 ) 

839 

840 # Allow at least the greater of one second or 5x the deadline. If deadline 

841 # is None, allow 30s - the user can disable the healthcheck too if desired. 

842 draw_time = state.total_draw_time 

843 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6)) 

844 if ( 

845 draw_time > max(1.0, draw_time_limit.total_seconds()) 

846 # we disable HealthCheck.too_slow under concurrent threads, since 

847 # cpython may switch away from a thread for arbitrarily long. 

848 and not self.thread_overlap.get(threading.get_ident(), False) 

849 ): 

850 extra_str = [] 

851 if state.invalid_examples: 

852 extra_str.append(f"{state.invalid_examples} invalid inputs") 

853 if state.overrun_examples: 

854 extra_str.append( 

855 f"{state.overrun_examples} inputs which exceeded the " 

856 "maximum allowed entropy" 

857 ) 

858 extra_str = ", and ".join(extra_str) 

859 extra_str = f" ({extra_str})" if extra_str else "" 

860 

861 fail_health_check( 

862 self.settings, 

863 "Input generation is slow: Hypothesis only generated " 

864 f"{state.valid_examples} valid inputs after {draw_time:.2f} " 

865 f"seconds{extra_str}." 

866 "\n" + state.timing_report() + "\n\n" 

867 "This could be for a few reasons:" 

868 "\n" 

869 "1. This strategy could be generating too much data per input. " 

870 "Try decreasing the amount of data generated, for example by " 

871 "decreasing the minimum size of collection strategies like " 

872 "st.lists()." 

873 "\n" 

874 "2. Some other expensive computation could be running during input " 

875 "generation. For example, " 

876 "if @st.composite or st.data() is interspersed with an expensive " 

877 "computation, HealthCheck.too_slow is likely to trigger. If this " 

878 "computation is unrelated to input generation, move it elsewhere. " 

879 "Otherwise, try making it more efficient, or disable this health " 

880 "check if that is not possible." 

881 "\n\n" 

882 "If you expect input generation to take this long, you can disable " 

883 "this health check with " 

884 "@settings(suppress_health_check=[HealthCheck.too_slow]). See " 

885 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

886 "for details.", 

887 HealthCheck.too_slow, 

888 ) 

889 

890 def save_choices( 

891 self, choices: Sequence[ChoiceT], sub_key: bytes | None = None 

892 ) -> None: 

893 if self.settings.database is not None: 

894 key = self.sub_key(sub_key) 

895 if key is None: 

896 return 

897 self.settings.database.save(key, choices_to_bytes(choices)) 

898 

899 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None: 

900 buffer = choices_to_bytes(choices) 

901 if self.settings.database is not None and self.database_key is not None: 

902 self.settings.database.move(self.database_key, self.secondary_key, buffer) 

903 

904 def sub_key(self, sub_key: bytes | None) -> bytes | None: 

905 if self.database_key is None: 

906 return None 

907 if sub_key is None: 

908 return self.database_key 

909 return b".".join((self.database_key, sub_key)) 

910 

911 @property 

912 def secondary_key(self) -> bytes | None: 

913 return self.sub_key(b"secondary") 

914 

915 @property 

916 def pareto_key(self) -> bytes | None: 

917 return self.sub_key(b"pareto") 

918 

919 def debug(self, message: str) -> None: 

920 if self.settings.verbosity >= Verbosity.debug: 

921 base_report(message) 

922 

923 @property 

924 def report_debug_info(self) -> bool: 

925 return self.settings.verbosity >= Verbosity.debug 

926 

927 def debug_data(self, data: ConjectureData | ConjectureResult) -> None: 

928 if not self.report_debug_info: 

929 return 

930 

931 status = repr(data.status) 

932 if data.status == Status.INTERESTING: 

933 status = f"{status} ({data.interesting_origin!r})" 

934 elif data.status == Status.INVALID and isinstance(data, ConjectureData): 

935 assert isinstance(data, ConjectureData) # mypy is silly 

936 status = f"{status} ({data.events.get('invalid because', '?')})" 

937 

938 newline_tab = "\n\t" 

939 self.debug( 

940 f"{len(data.choices)} choices -> {status}\n\t{data.choices}" 

941 f"{newline_tab + data.output if data.output else ''}" 

942 ) 

943 

944 def observe_for_provider(self) -> AbstractContextManager: 

945 def on_observation(observation: Observation) -> None: 

946 assert observation.type == "test_case" 

947 # because lifetime == "test_function" 

948 assert isinstance(self.provider, PrimitiveProvider) 

949 # only fire if we actually used that provider to generate this observation 

950 if not self._switch_to_hypothesis_provider: 

951 self.provider.on_observation(observation) 

952 

953 if ( 

954 self.settings.backend != "hypothesis" 

955 # only for lifetime = "test_function" providers (guaranteed 

956 # by this isinstance check) 

957 and isinstance(self.provider, PrimitiveProvider) 

958 # and the provider opted-in to observations 

959 and self.provider.add_observability_callback 

960 ): 

961 return with_observability_callback(on_observation) 

962 return nullcontext() 

963 

964 def run(self) -> None: 

965 with local_settings(self.settings), self.observe_for_provider(): 

966 try: 

967 self._run() 

968 except RunIsComplete: 

969 pass 

970 for v in self.interesting_examples.values(): 

971 self.debug_data(v) 

972 self.debug( 

973 f"Run complete after {self.call_count} examples " 

974 f"({self.valid_examples} valid) and {self.shrinks} shrinks" 

975 ) 

976 

977 @property 

978 def database(self) -> ExampleDatabase | None: 

979 if self.database_key is None: 

980 return None 

981 return self.settings.database 

982 

983 def has_existing_examples(self) -> bool: 

984 return self.database is not None and Phase.reuse in self.settings.phases 

985 

986 def reuse_existing_examples(self) -> None: 

987 """If appropriate (we have a database and have been told to use it), 

988 try to reload existing examples from the database. 

989 

990 If there are a lot we don't try all of them. We always try the 

991 smallest example in the database (which is guaranteed to be the 

992 last failure) and the largest (which is usually the seed example 

993 which the last failure came from but we don't enforce that). We 

994 then take a random sampling of the remainder and try those. Any 

995 examples that are no longer interesting are cleared out. 

996 """ 

997 if self.has_existing_examples(): 

998 self.debug("Reusing examples from database") 

999 # We have to do some careful juggling here. We have two database 

1000 # corpora: The primary and secondary. The primary corpus is a 

1001 # small set of minimized examples each of which has at one point 

1002 # demonstrated a distinct bug. We want to retry all of these. 

1003 

1004 # We also have a secondary corpus of examples that have at some 

1005 # point demonstrated interestingness (currently only ones that 

1006 # were previously non-minimal examples of a bug, but this will 

1007 # likely expand in future). These are a good source of potentially 

1008 # interesting examples, but there are a lot of them, so we down 

1009 # sample the secondary corpus to a more manageable size. 

1010 

1011 corpus = sorted( 

1012 self.settings.database.fetch(self.database_key), key=shortlex 

1013 ) 

1014 factor = 0.1 if (Phase.generate in self.settings.phases) else 1 

1015 desired_size = max(2, ceil(factor * self.settings.max_examples)) 

1016 primary_corpus_size = len(corpus) 

1017 

1018 if len(corpus) < desired_size: 

1019 extra_corpus = list(self.settings.database.fetch(self.secondary_key)) 

1020 

1021 shortfall = desired_size - len(corpus) 

1022 

1023 if len(extra_corpus) <= shortfall: 

1024 extra = extra_corpus 

1025 else: 

1026 extra = self.random.sample(extra_corpus, shortfall) 

1027 extra.sort(key=shortlex) 

1028 corpus.extend(extra) 

1029 

1030 # We want a fast path where every primary entry in the database was 

1031 # interesting. 

1032 found_interesting_in_primary = False 

1033 all_interesting_in_primary_were_exact = True 

1034 

1035 for i, existing in enumerate(corpus): 

1036 if i >= primary_corpus_size and found_interesting_in_primary: 

1037 break 

1038 choices = choices_from_bytes(existing) 

1039 if choices is None: 

1040 # clear out any keys which fail deserialization 

1041 self.settings.database.delete(self.database_key, existing) 

1042 continue 

1043 data = self.cached_test_function(choices, extend="full") 

1044 if data.status != Status.INTERESTING: 

1045 self.settings.database.delete(self.database_key, existing) 

1046 self.settings.database.delete(self.secondary_key, existing) 

1047 else: 

1048 if i < primary_corpus_size: 

1049 found_interesting_in_primary = True 

1050 assert not isinstance(data, _Overrun) 

1051 if choices_key(choices) != choices_key(data.choices): 

1052 all_interesting_in_primary_were_exact = False 

1053 if not self.settings.report_multiple_bugs: 

1054 break 

1055 if found_interesting_in_primary: 

1056 if all_interesting_in_primary_were_exact: 

1057 self.reused_previously_shrunk_test_case = True 

1058 

1059 # Because self.database is not None (because self.has_existing_examples()) 

1060 # and self.database_key is not None (because we fetched using it above), 

1061 # we can guarantee self.pareto_front is not None 

1062 assert self.pareto_front is not None 

1063 

1064 # If we've not found any interesting examples so far we try some of 

1065 # the pareto front from the last run. 

1066 if len(corpus) < desired_size and not self.interesting_examples: 

1067 desired_extra = desired_size - len(corpus) 

1068 pareto_corpus = list(self.settings.database.fetch(self.pareto_key)) 

1069 if len(pareto_corpus) > desired_extra: 

1070 pareto_corpus = self.random.sample(pareto_corpus, desired_extra) 

1071 pareto_corpus.sort(key=shortlex) 

1072 

1073 for existing in pareto_corpus: 

1074 choices = choices_from_bytes(existing) 

1075 if choices is None: 

1076 self.settings.database.delete(self.pareto_key, existing) 

1077 continue 

1078 data = self.cached_test_function(choices, extend="full") 

1079 if data not in self.pareto_front: 

1080 self.settings.database.delete(self.pareto_key, existing) 

1081 if data.status == Status.INTERESTING: 

1082 break 

1083 

1084 def exit_with(self, reason: ExitReason) -> None: 

1085 if self.ignore_limits: 

1086 return 

1087 self.statistics["stopped-because"] = reason.describe(self.settings) 

1088 if self.best_observed_targets: 

1089 self.statistics["targets"] = dict(self.best_observed_targets) 

1090 self.debug(f"exit_with({reason.name})") 

1091 self.exit_reason = reason 

1092 raise RunIsComplete 

1093 

1094 def should_generate_more(self) -> bool: 

1095 # End the generation phase where we would have ended it if no bugs had 

1096 # been found. This reproduces the exit logic in `self.test_function`, 

1097 # but with the important distinction that this clause will move on to 

1098 # the shrinking phase having found one or more bugs, while the other 

1099 # will exit having found zero bugs. 

1100 invalid_threshold = ( 

1101 INVALID_THRESHOLD_BASE + INVALID_PER_VALID * self.valid_examples 

1102 ) 

1103 if ( 

1104 self.valid_examples >= self.settings.max_examples 

1105 or (self.invalid_examples + self.overrun_examples) > invalid_threshold 

1106 ): # pragma: no cover 

1107 return False 

1108 

1109 # If we haven't found a bug, keep looking - if we hit any limits on 

1110 # the number of tests to run that will raise an exception and stop 

1111 # the run. 

1112 if not self.interesting_examples: 

1113 return True 

1114 # Users who disable shrinking probably want to exit as fast as possible. 

1115 # If we've found a bug and won't report more than one, stop looking. 

1116 # If we first saw a bug more than 10 seconds ago, stop looking. 

1117 elif ( 

1118 Phase.shrink not in self.settings.phases 

1119 or not self.settings.report_multiple_bugs 

1120 or time.monotonic() - self.first_bug_found_time > 10 

1121 ): 

1122 return False 

1123 assert isinstance(self.first_bug_found_at, int) 

1124 assert isinstance(self.last_bug_found_at, int) 

1125 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count 

1126 # Otherwise, keep searching for between ten and 'a heuristic' calls. 

1127 # We cap 'calls after first bug' so errors are reported reasonably 

1128 # soon even for tests that are allowed to run for a very long time, 

1129 # or sooner if the latest half of our test effort has been fruitless. 

1130 return self.call_count < MIN_TEST_CALLS or self.call_count < min( 

1131 self.first_bug_found_at + 1000, self.last_bug_found_at * 2 

1132 ) 

1133 

1134 def generate_new_examples(self) -> None: 

1135 if Phase.generate not in self.settings.phases: 

1136 return 

1137 if self.interesting_examples: 

1138 # The example database has failing examples from a previous run, 

1139 # so we'd rather report that they're still failing ASAP than take 

1140 # the time to look for additional failures. 

1141 return 

1142 

1143 self.debug("Generating new examples") 

1144 

1145 assert self.should_generate_more() 

1146 self._switch_to_hypothesis_provider = True 

1147 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),)) 

1148 if zero_data.status > Status.OVERRUN: 

1149 assert isinstance(zero_data, ConjectureResult) 

1150 # if the crosshair backend cannot proceed, it does not (and cannot) 

1151 # realize the symbolic values, with the intent that Hypothesis will 

1152 # throw away this test case. We usually do, but if it's the zero data 

1153 # then we try to pin it here, which requires realizing the symbolics. 

1154 # 

1155 # We don't (yet) rely on the zero data being pinned, and so 

1156 # it's simply a very slight performance loss to simply not pin it 

1157 # if doing so would error. 

1158 if zero_data.cannot_proceed_scope is None: # pragma: no branch 

1159 self.__data_cache.pin( 

1160 self._cache_key(zero_data.choices), zero_data.as_result() 

1161 ) # Pin forever 

1162 

1163 if zero_data.status == Status.OVERRUN or ( 

1164 zero_data.status == Status.VALID 

1165 and isinstance(zero_data, ConjectureResult) 

1166 and zero_data.length * 2 > BUFFER_SIZE 

1167 ): 

1168 fail_health_check( 

1169 self.settings, 

1170 "The smallest natural input for this test is very " 

1171 "large. This makes it difficult for Hypothesis to generate " 

1172 "good inputs, especially when trying to shrink failing inputs." 

1173 "\n\n" 

1174 "Consider reducing the amount of data generated by the strategy. " 

1175 "Also consider introducing small alternative values for some " 

1176 "strategies. For example, could you " 

1177 "mark some arguments as optional by replacing `some_complex_strategy`" 

1178 "with `st.none() | some_complex_strategy`?" 

1179 "\n\n" 

1180 "If you are confident that the size of the smallest natural input " 

1181 "to your test cannot be reduced, you can suppress this health check " 

1182 "with @settings(suppress_health_check=[HealthCheck.large_base_example]). " 

1183 "See " 

1184 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

1185 "for details.", 

1186 HealthCheck.large_base_example, 

1187 ) 

1188 

1189 self.health_check_state = HealthCheckState() 

1190 

1191 # We attempt to use the size of the minimal generated test case starting 

1192 # from a given novel prefix as a guideline to generate smaller test 

1193 # cases for an initial period, by restriscting ourselves to test cases 

1194 # that are not much larger than it. 

1195 # 

1196 # Calculating the actual minimal generated test case is hard, so we 

1197 # take a best guess that zero extending a prefix produces the minimal 

1198 # test case starting with that prefix (this is true for our built in 

1199 # strategies). This is only a reasonable thing to do if the resulting 

1200 # test case is valid. If we regularly run into situations where it is 

1201 # not valid then this strategy is a waste of time, so we want to 

1202 # abandon it early. In order to do this we track how many times in a 

1203 # row it has failed to work, and abort small test case generation when 

1204 # it has failed too many times in a row. 

1205 consecutive_zero_extend_is_invalid = 0 

1206 

1207 # We control growth during initial example generation, for two 

1208 # reasons: 

1209 # 

1210 # * It gives us an opportunity to find small examples early, which 

1211 # gives us a fast path for easy to find bugs. 

1212 # * It avoids low probability events where we might end up 

1213 # generating very large examples during health checks, which 

1214 # on slower machines can trigger HealthCheck.too_slow. 

1215 # 

1216 # The heuristic we use is that we attempt to estimate the smallest 

1217 # extension of this prefix, and limit the size to no more than 

1218 # an order of magnitude larger than that. If we fail to estimate 

1219 # the size accurately, we skip over this prefix and try again. 

1220 # 

1221 # We need to tune the example size based on the initial prefix, 

1222 # because any fixed size might be too small, and any size based 

1223 # on the strategy in general can fall afoul of strategies that 

1224 # have very different sizes for different prefixes. 

1225 # 

1226 # We previously set a minimum value of 10 on small_example_cap, with the 

1227 # reasoning of avoiding flaky health checks. However, some users set a 

1228 # low max_examples for performance. A hard lower bound in this case biases 

1229 # the distribution towards small (and less powerful) examples. Flaky 

1230 # and loud health checks are better than silent performance degradation. 

1231 small_example_cap = min(self.settings.max_examples // 10, 50) 

1232 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10) 

1233 ran_optimisations = False 

1234 self._switch_to_hypothesis_provider = False 

1235 

1236 while self.should_generate_more(): 

1237 # we don't yet integrate DataTree with backends. Instead of generating 

1238 # a novel prefix, ask the backend for an input. 

1239 if not self.using_hypothesis_backend: 

1240 data = self.new_conjecture_data([]) 

1241 self.test_function(data) 

1242 continue 

1243 

1244 self._current_phase = "generate" 

1245 prefix = self.generate_novel_prefix() 

1246 if ( 

1247 self.valid_examples <= small_example_cap 

1248 and self.call_count <= 5 * small_example_cap 

1249 and not self.interesting_examples 

1250 and consecutive_zero_extend_is_invalid < 5 

1251 ): 

1252 minimal_example = self.cached_test_function( 

1253 prefix + (ChoiceTemplate("simplest", count=None),) 

1254 ) 

1255 

1256 if minimal_example.status < Status.VALID: 

1257 consecutive_zero_extend_is_invalid += 1 

1258 continue 

1259 # Because the Status code is greater than Status.VALID, it cannot be 

1260 # Status.OVERRUN, which guarantees that the minimal_example is a 

1261 # ConjectureResult object. 

1262 assert isinstance(minimal_example, ConjectureResult) 

1263 consecutive_zero_extend_is_invalid = 0 

1264 minimal_extension = len(minimal_example.choices) - len(prefix) 

1265 max_length = len(prefix) + minimal_extension * 5 

1266 

1267 # We could end up in a situation where even though the prefix was 

1268 # novel when we generated it, because we've now tried zero extending 

1269 # it not all possible continuations of it will be novel. In order to 

1270 # avoid making redundant test calls, we rerun it in simulation mode 

1271 # first. If this has a predictable result, then we don't bother 

1272 # running the test function for real here. If however we encounter 

1273 # some novel behaviour, we try again with the real test function, 

1274 # starting from the new novel prefix that has discovered. 

1275 trial_data = self.new_conjecture_data(prefix, max_choices=max_length) 

1276 try: 

1277 self.tree.simulate_test_function(trial_data) 

1278 continue 

1279 except PreviouslyUnseenBehaviour: 

1280 pass 

1281 

1282 # If the simulation entered part of the tree that has been killed, 

1283 # we don't want to run this. 

1284 assert isinstance(trial_data.observer, TreeRecordingObserver) 

1285 if trial_data.observer.killed: 

1286 continue 

1287 

1288 # We might have hit the cap on number of examples we should 

1289 # run when calculating the minimal example. 

1290 if not self.should_generate_more(): 

1291 break 

1292 

1293 prefix = trial_data.choices 

1294 else: 

1295 max_length = None 

1296 

1297 data = self.new_conjecture_data(prefix, max_choices=max_length) 

1298 self.test_function(data) 

1299 

1300 if ( 

1301 data.status is Status.OVERRUN 

1302 and max_length is not None 

1303 and "invalid because" not in data.events 

1304 ): 

1305 data.events["invalid because"] = ( 

1306 "reduced max size for early examples (avoids flaky health checks)" 

1307 ) 

1308 

1309 self.generate_mutations_from(data) 

1310 

1311 # Although the optimisations are logically a distinct phase, we 

1312 # actually normally run them as part of example generation. The 

1313 # reason for this is that we cannot guarantee that optimisation 

1314 # actually exhausts our budget: It might finish running and we 

1315 # discover that actually we still could run a bunch more test cases 

1316 # if we want. 

1317 if ( 

1318 self.valid_examples >= max(small_example_cap, optimise_at) 

1319 and not ran_optimisations 

1320 ): 

1321 ran_optimisations = True 

1322 self._current_phase = "target" 

1323 self.optimise_targets() 

1324 

1325 def generate_mutations_from(self, data: ConjectureData | ConjectureResult) -> None: 

1326 # A thing that is often useful but rarely happens by accident is 

1327 # to generate the same value at multiple different points in the 

1328 # test case. 

1329 # 

1330 # Rather than make this the responsibility of individual strategies 

1331 # we implement a small mutator that just takes parts of the test 

1332 # case with the same label and tries replacing one of them with a 

1333 # copy of the other and tries running it. If we've made a good 

1334 # guess about what to put where, this will run a similar generated 

1335 # test case with more duplication. 

1336 if ( 

1337 # An OVERRUN doesn't have enough information about the test 

1338 # case to mutate, so we just skip those. 

1339 data.status >= Status.INVALID 

1340 # This has a tendency to trigger some weird edge cases during 

1341 # generation so we don't let it run until we're done with the 

1342 # health checks. 

1343 and self.health_check_state is None 

1344 ): 

1345 initial_calls = self.call_count 

1346 failed_mutations = 0 

1347 

1348 while ( 

1349 self.should_generate_more() 

1350 # We implement fairly conservative checks for how long we 

1351 # we should run mutation for, as it's generally not obvious 

1352 # how helpful it is for any given test case. 

1353 and self.call_count <= initial_calls + 5 

1354 and failed_mutations <= 5 

1355 ): 

1356 groups = data.spans.mutator_groups 

1357 if not groups: 

1358 break 

1359 

1360 group = self.random.choice(groups) 

1361 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2) 

1362 if start1 > start2: 

1363 (start1, end1), (start2, end2) = (start2, end2), (start1, end1) 

1364 

1365 if ( 

1366 start1 <= start2 <= end2 <= end1 

1367 ): # pragma: no cover # flaky on conjecture-cover tests 

1368 # One span entirely contains the other. The strategy is very 

1369 # likely some kind of tree. e.g. we might have 

1370 # 

1371 # ┌─────┐ 

1372 # ┌─────┤ a ├──────┐ 

1373 # │ └─────┘ │ 

1374 # ┌──┴──┐ ┌──┴──┐ 

1375 # ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1376 # │ └──┬──┘ │ │ └──┬──┘ │ 

1377 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1378 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1379 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1380 # 

1381 # where each node is drawn from the same strategy and so 

1382 # has the same span label. We might have selected the spans 

1383 # corresponding to the a and c nodes, which is the entire 

1384 # tree and the subtree of (and including) c respectively. 

1385 # 

1386 # There are two possible mutations we could apply in this case: 

1387 # 1. replace a with c (replace child with parent) 

1388 # 2. replace c with a (replace parent with child) 

1389 # 

1390 # (1) results in multiple partial copies of the 

1391 # parent: 

1392 # ┌─────┐ 

1393 # ┌─────┤ a ├────────────┐ 

1394 # │ └─────┘ │ 

1395 # ┌──┴──┐ ┌─┴───┐ 

1396 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐ 

1397 # │ └──┬──┘ │ │ └─────┘ │ 

1398 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐ 

1399 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1400 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │ 

1401 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1402 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1403 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1404 # 

1405 # While (2) results in truncating part of the parent: 

1406 # 

1407 # ┌─────┐ 

1408 # ┌──┤ c ├──┐ 

1409 # │ └──┬──┘ │ 

1410 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1411 # │ g │ │ h │ │ i │ 

1412 # └───┘ └───┘ └───┘ 

1413 # 

1414 # (1) is the same as Example IV.4. in Nautilus (NDSS '19) 

1415 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf), 

1416 # except we do not repeat the replacement additional times 

1417 # (the paper repeats it once for a total of two copies). 

1418 # 

1419 # We currently only apply mutation (1), and ignore mutation 

1420 # (2). The reason is that the attempt generated from (2) is 

1421 # always something that Hypothesis could easily have generated 

1422 # itself, by simply not making various choices. Whereas 

1423 # duplicating the exact value + structure of particular choices 

1424 # in (1) would have been hard for Hypothesis to generate by 

1425 # chance. 

1426 # 

1427 # TODO: an extension of this mutation might repeat (1) on 

1428 # a geometric distribution between 0 and ~10 times. We would 

1429 # need to find the corresponding span to recurse on in the new 

1430 # choices, probably just by using the choices index. 

1431 

1432 # case (1): duplicate the choices in start1:start2. 

1433 attempt = data.choices[:start2] + data.choices[start1:] 

1434 else: 

1435 start, end = self.random.choice([(start1, end1), (start2, end2)]) 

1436 replacement = data.choices[start:end] 

1437 # We attempt to replace both the examples with 

1438 # whichever choice we made. Note that this might end 

1439 # up messing up and getting the example boundaries 

1440 # wrong - labels matching are only a best guess as to 

1441 # whether the two are equivalent - but it doesn't 

1442 # really matter. It may not achieve the desired result, 

1443 # but it's still a perfectly acceptable choice sequence 

1444 # to try. 

1445 attempt = ( 

1446 data.choices[:start1] 

1447 + replacement 

1448 + data.choices[end1:start2] 

1449 + replacement 

1450 + data.choices[end2:] 

1451 ) 

1452 

1453 try: 

1454 new_data = self.cached_test_function( 

1455 attempt, 

1456 # We set error_on_discard so that we don't end up 

1457 # entering parts of the tree we consider redundant 

1458 # and not worth exploring. 

1459 error_on_discard=True, 

1460 ) 

1461 except ContainsDiscard: 

1462 failed_mutations += 1 

1463 continue 

1464 

1465 if new_data is Overrun: 

1466 failed_mutations += 1 # pragma: no cover # annoying case 

1467 else: 

1468 assert isinstance(new_data, ConjectureResult) 

1469 if ( 

1470 new_data.status >= data.status 

1471 and choices_key(data.choices) != choices_key(new_data.choices) 

1472 and all( 

1473 k in new_data.target_observations 

1474 and new_data.target_observations[k] >= v 

1475 for k, v in data.target_observations.items() 

1476 ) 

1477 ): 

1478 data = new_data 

1479 failed_mutations = 0 

1480 else: 

1481 failed_mutations += 1 

1482 

1483 def optimise_targets(self) -> None: 

1484 """If any target observations have been made, attempt to optimise them 

1485 all.""" 

1486 if not self.should_optimise: 

1487 return 

1488 from hypothesis.internal.conjecture.optimiser import Optimiser 

1489 

1490 # We want to avoid running the optimiser for too long in case we hit 

1491 # an unbounded target score. We start this off fairly conservatively 

1492 # in case interesting examples are easy to find and then ramp it up 

1493 # on an exponential schedule so we don't hamper the optimiser too much 

1494 # if it needs a long time to find good enough improvements. 

1495 max_improvements = 10 

1496 while True: 

1497 prev_calls = self.call_count 

1498 

1499 any_improvements = False 

1500 

1501 for target, data in list(self.best_examples_of_observed_targets.items()): 

1502 optimiser = Optimiser( 

1503 self, data, target, max_improvements=max_improvements 

1504 ) 

1505 optimiser.run() 

1506 if optimiser.improvements > 0: 

1507 any_improvements = True 

1508 

1509 if self.interesting_examples: 

1510 break 

1511 

1512 max_improvements *= 2 

1513 

1514 if any_improvements: 

1515 continue 

1516 

1517 if self.best_observed_targets: 

1518 self.pareto_optimise() 

1519 

1520 if prev_calls == self.call_count: 

1521 break 

1522 

1523 def pareto_optimise(self) -> None: 

1524 if self.pareto_front is not None: 

1525 ParetoOptimiser(self).run() 

1526 

1527 def _run(self) -> None: 

1528 # have to use the primitive provider to interpret database bits... 

1529 self._switch_to_hypothesis_provider = True 

1530 with self._log_phase_statistics("reuse"): 

1531 self.reuse_existing_examples() 

1532 # Fast path for development: If the database gave us interesting 

1533 # examples from the previously stored primary key, don't try 

1534 # shrinking it again as it's unlikely to work. 

1535 if self.reused_previously_shrunk_test_case: 

1536 self.exit_with(ExitReason.finished) 

1537 # ...but we should use the supplied provider when generating... 

1538 self._switch_to_hypothesis_provider = False 

1539 with self._log_phase_statistics("generate"): 

1540 self.generate_new_examples() 

1541 # We normally run the targeting phase mixed in with the generate phase, 

1542 # but if we've been asked to run it but not generation then we have to 

1543 # run it explicitly on its own here. 

1544 if Phase.generate not in self.settings.phases: 

1545 self._current_phase = "target" 

1546 self.optimise_targets() 

1547 # ...and back to the primitive provider when shrinking. 

1548 self._switch_to_hypothesis_provider = True 

1549 with self._log_phase_statistics("shrink"): 

1550 self.shrink_interesting_examples() 

1551 self.exit_with(ExitReason.finished) 

1552 

1553 def new_conjecture_data( 

1554 self, 

1555 prefix: Sequence[ChoiceT | ChoiceTemplate], 

1556 *, 

1557 observer: DataObserver | None = None, 

1558 max_choices: int | None = None, 

1559 ) -> ConjectureData: 

1560 provider = ( 

1561 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider 

1562 ) 

1563 observer = observer or self.tree.new_observer() 

1564 if not self.using_hypothesis_backend: 

1565 observer = DataObserver() 

1566 

1567 return ConjectureData( 

1568 prefix=prefix, 

1569 observer=observer, 

1570 provider=provider, 

1571 max_choices=max_choices, 

1572 random=self.random, 

1573 ) 

1574 

1575 def shrink_interesting_examples(self) -> None: 

1576 """If we've found interesting examples, try to replace each of them 

1577 with a minimal interesting example with the same interesting_origin. 

1578 

1579 We may find one or more examples with a new interesting_origin 

1580 during the shrink process. If so we shrink these too. 

1581 """ 

1582 if Phase.shrink not in self.settings.phases or not self.interesting_examples: 

1583 return 

1584 

1585 self.debug("Shrinking interesting examples") 

1586 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS 

1587 

1588 for prev_data in sorted( 

1589 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes) 

1590 ): 

1591 assert prev_data.status == Status.INTERESTING 

1592 data = self.new_conjecture_data(prev_data.choices) 

1593 self.test_function(data) 

1594 if data.status != Status.INTERESTING: 

1595 self.exit_with(ExitReason.flaky) 

1596 

1597 self.clear_secondary_key() 

1598 

1599 while len(self.shrunk_examples) < len(self.interesting_examples): 

1600 target, example = min( 

1601 ( 

1602 (k, v) 

1603 for k, v in self.interesting_examples.items() 

1604 if k not in self.shrunk_examples 

1605 ), 

1606 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))), 

1607 ) 

1608 self.debug(f"Shrinking {target!r}: {example.choices}") 

1609 

1610 if not self.settings.report_multiple_bugs: 

1611 # If multi-bug reporting is disabled, we shrink our currently-minimal 

1612 # failure, allowing 'slips' to any bug with a smaller minimal example. 

1613 self.shrink(example, lambda d: d.status == Status.INTERESTING) 

1614 return 

1615 

1616 def predicate(d: ConjectureResult | _Overrun) -> bool: 

1617 if d.status < Status.INTERESTING: 

1618 return False 

1619 d = cast(ConjectureResult, d) 

1620 return d.interesting_origin == target 

1621 

1622 self.shrink(example, predicate) 

1623 

1624 self.shrunk_examples.add(target) 

1625 

1626 def clear_secondary_key(self) -> None: 

1627 if self.has_existing_examples(): 

1628 # If we have any smaller examples in the secondary corpus, now is 

1629 # a good time to try them to see if they work as shrinks. They 

1630 # probably won't, but it's worth a shot and gives us a good 

1631 # opportunity to clear out the database. 

1632 

1633 # It's not worth trying the primary corpus because we already 

1634 # tried all of those in the initial phase. 

1635 corpus = sorted( 

1636 self.settings.database.fetch(self.secondary_key), key=shortlex 

1637 ) 

1638 for c in corpus: 

1639 choices = choices_from_bytes(c) 

1640 if choices is None: 

1641 self.settings.database.delete(self.secondary_key, c) 

1642 continue 

1643 primary = { 

1644 choices_to_bytes(v.choices) 

1645 for v in self.interesting_examples.values() 

1646 } 

1647 if shortlex(c) > max(map(shortlex, primary)): 

1648 break 

1649 

1650 self.cached_test_function(choices) 

1651 # We unconditionally remove c from the secondary key as it 

1652 # is either now primary or worse than our primary example 

1653 # of this reason for interestingness. 

1654 self.settings.database.delete(self.secondary_key, c) 

1655 

1656 def shrink( 

1657 self, 

1658 example: ConjectureData | ConjectureResult, 

1659 predicate: ShrinkPredicateT | None = None, 

1660 allow_transition: ( 

1661 Callable[[ConjectureData | ConjectureResult, ConjectureData], bool] | None 

1662 ) = None, 

1663 ) -> ConjectureData | ConjectureResult: 

1664 s = self.new_shrinker(example, predicate, allow_transition) 

1665 s.shrink() 

1666 return s.shrink_target 

1667 

1668 def new_shrinker( 

1669 self, 

1670 example: ConjectureData | ConjectureResult, 

1671 predicate: ShrinkPredicateT | None = None, 

1672 allow_transition: ( 

1673 Callable[[ConjectureData | ConjectureResult, ConjectureData], bool] | None 

1674 ) = None, 

1675 ) -> Shrinker: 

1676 return Shrinker( 

1677 self, 

1678 example, 

1679 predicate, 

1680 allow_transition=allow_transition, 

1681 explain=Phase.explain in self.settings.phases, 

1682 in_target_phase=self._current_phase == "target", 

1683 ) 

1684 

1685 def passing_choice_sequences( 

1686 self, prefix: Sequence[ChoiceNode] = () 

1687 ) -> frozenset[tuple[ChoiceNode, ...]]: 

1688 """Return a collection of choice sequence nodes which cause the test to pass. 

1689 Optionally restrict this by a certain prefix, which is useful for explain mode. 

1690 """ 

1691 return frozenset( 

1692 cast(ConjectureResult, result).nodes 

1693 for key in self.__data_cache 

1694 if (result := self.__data_cache[key]).status is Status.VALID 

1695 and startswith(cast(ConjectureResult, result).nodes, prefix) 

1696 ) 

1697 

1698 

1699class ContainsDiscard(Exception): 

1700 pass