Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/engine.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

706 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import importlib 

12import inspect 

13import math 

14import threading 

15import time 

16from collections import defaultdict 

17from collections.abc import Callable, Generator, Sequence 

18from contextlib import AbstractContextManager, contextmanager, nullcontext, suppress 

19from dataclasses import dataclass, field 

20from datetime import timedelta 

21from enum import Enum 

22from random import Random 

23from typing import Literal, NoReturn, cast 

24 

25from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings 

26from hypothesis._settings import local_settings, note_deprecation 

27from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes 

28from hypothesis.errors import ( 

29 BackendCannotProceed, 

30 FlakyBackendFailure, 

31 HypothesisException, 

32 InvalidArgument, 

33 StopTest, 

34) 

35from hypothesis.internal.cache import LRUReusedCache 

36from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override 

37from hypothesis.internal.conjecture.choice import ( 

38 ChoiceConstraintsT, 

39 ChoiceKeyT, 

40 ChoiceNode, 

41 ChoiceT, 

42 ChoiceTemplate, 

43 choices_key, 

44) 

45from hypothesis.internal.conjecture.data import ( 

46 ConjectureData, 

47 ConjectureResult, 

48 DataObserver, 

49 Overrun, 

50 Status, 

51 _Overrun, 

52) 

53from hypothesis.internal.conjecture.datatree import ( 

54 DataTree, 

55 PreviouslyUnseenBehaviour, 

56 TreeRecordingObserver, 

57) 

58from hypothesis.internal.conjecture.junkdrawer import ( 

59 ensure_free_stackframes, 

60 startswith, 

61) 

62from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser 

63from hypothesis.internal.conjecture.providers import ( 

64 AVAILABLE_PROVIDERS, 

65 HypothesisProvider, 

66 PrimitiveProvider, 

67) 

68from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key 

69from hypothesis.internal.escalation import InterestingOrigin 

70from hypothesis.internal.healthcheck import fail_health_check 

71from hypothesis.internal.observability import Observation, with_observability_callback 

72from hypothesis.reporting import base_report, report 

73 

74# In most cases, the following constants are all Final. However, we do allow users 

75# to monkeypatch all of these variables, which means we cannot annotate them as 

76# Final or mypyc will inline them and render monkeypatching useless. 

77 

78#: The maximum number of times the shrinker will reduce the complexity of a failing 

79#: input before giving up. This avoids falling down a trap of exponential (or worse) 

80#: complexity, where the shrinker appears to be making progress but will take a 

81#: substantially long time to finish completely. 

82MAX_SHRINKS: int = 500 

83 

84# If the shrinking phase takes more than five minutes, abort it early and print 

85# a warning. Many CI systems will kill a build after around ten minutes with 

86# no output, and appearing to hang isn't great for interactive use either - 

87# showing partially-shrunk examples is better than quitting with no examples! 

88# (but make it monkeypatchable, for the rare users who need to keep on shrinking) 

89 

90#: The maximum total time in seconds that the shrinker will try to shrink a failure 

91#: for before giving up. This is across all shrinks for the same failure, so even 

92#: if the shrinker successfully reduces the complexity of a single failure several 

93#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken. 

94MAX_SHRINKING_SECONDS: int = 300 

95 

96#: The maximum amount of entropy a single test case can use before giving up 

97#: while making random choices during input generation. 

98#: 

99#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you 

100#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly 

101#: increase the amount of entropy a test case can use during generation. 

102BUFFER_SIZE: int = 8 * 1024 

103CACHE_SIZE: int = 10000 

104MIN_TEST_CALLS: int = 10 

105 

106# we use this to isolate Hypothesis from interacting with the global random, 

107# to make it easier to reason about our global random warning logic easier (see 

108# deprecate_random_in_strategy). 

109_random = Random() 

110 

111 

112def shortlex(s): 

113 return (len(s), s) 

114 

115 

116@dataclass(slots=True, frozen=False) 

117class HealthCheckState: 

118 valid_examples: int = field(default=0) 

119 invalid_examples: int = field(default=0) 

120 overrun_examples: int = field(default=0) 

121 draw_times: defaultdict[str, list[float]] = field( 

122 default_factory=lambda: defaultdict(list) 

123 ) 

124 

125 @property 

126 def total_draw_time(self) -> float: 

127 return math.fsum(sum(self.draw_times.values(), start=[])) 

128 

129 def timing_report(self) -> str: 

130 """Return a terminal report describing what was slow.""" 

131 if not self.draw_times: 

132 return "" 

133 width = max( 

134 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times 

135 ) 

136 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"] 

137 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1])) 

138 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch 

139 # If we have very many unique keys, which can happen due to interactive 

140 # draws with computed labels, we'll skip uninformative rows. 

141 if ( 

142 5 <= i < (len(self.draw_times) - 2) 

143 and math.fsum(times) * 20 < self.total_draw_time 

144 ): 

145 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)") 

146 break 

147 # Compute the row to report, omitting times <1ms to focus on slow draws 

148 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4] 

149 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",") 

150 arg = argname.removeprefix("generate:").removesuffix(": ") 

151 out.append( 

152 f" {arg:^{width}} | {len(times):>4} | " 

153 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}" 

154 ) 

155 return "\n".join(out) 

156 

157 

158class ExitReason(Enum): 

159 max_examples = "settings.max_examples={s.max_examples}" 

160 max_iterations = ( 

161 "settings.max_examples={s.max_examples}, " 

162 "but < 10% of examples satisfied assumptions" 

163 ) 

164 max_shrinks = f"shrunk example {MAX_SHRINKS} times" 

165 finished = "nothing left to do" 

166 flaky = "test was flaky" 

167 very_slow_shrinking = "shrinking was very slow" 

168 

169 def describe(self, settings: Settings) -> str: 

170 return self.value.format(s=settings) 

171 

172 

173class RunIsComplete(Exception): 

174 pass 

175 

176 

177def _get_provider(backend: str) -> PrimitiveProvider | type[PrimitiveProvider]: 

178 provider_cls = AVAILABLE_PROVIDERS[backend] 

179 if isinstance(provider_cls, str): 

180 module_name, class_name = provider_cls.rsplit(".", 1) 

181 provider_cls = getattr(importlib.import_module(module_name), class_name) 

182 

183 if provider_cls.lifetime == "test_function": 

184 return provider_cls(None) 

185 elif provider_cls.lifetime == "test_case": 

186 return provider_cls 

187 else: 

188 raise InvalidArgument( 

189 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. " 

190 "Expected one of 'test_function', 'test_case'." 

191 ) 

192 

193 

194class CallStats(TypedDict): 

195 status: str 

196 runtime: float 

197 drawtime: float 

198 gctime: float 

199 events: list[str] 

200 

201 

202PhaseStatistics = TypedDict( 

203 "PhaseStatistics", 

204 { 

205 "duration-seconds": float, 

206 "test-cases": list[CallStats], 

207 "distinct-failures": int, 

208 "shrinks-successful": int, 

209 }, 

210) 

211StatisticsDict = TypedDict( 

212 "StatisticsDict", 

213 { 

214 "generate-phase": NotRequired[PhaseStatistics], 

215 "reuse-phase": NotRequired[PhaseStatistics], 

216 "shrink-phase": NotRequired[PhaseStatistics], 

217 "stopped-because": NotRequired[str], 

218 "targets": NotRequired[dict[str, float]], 

219 "nodeid": NotRequired[str], 

220 }, 

221) 

222 

223 

224def choice_count(choices: Sequence[ChoiceT | ChoiceTemplate]) -> int | None: 

225 count = 0 

226 for choice in choices: 

227 if isinstance(choice, ChoiceTemplate): 

228 if choice.count is None: 

229 return None 

230 count += choice.count 

231 else: 

232 count += 1 

233 return count 

234 

235 

236class DiscardObserver(DataObserver): 

237 @override 

238 def kill_branch(self) -> NoReturn: 

239 raise ContainsDiscard 

240 

241 

242def realize_choices(data: ConjectureData, *, for_failure: bool) -> None: 

243 # backwards-compatibility with backends without for_failure, can remove 

244 # in a few months 

245 kwargs = {} 

246 if for_failure: 

247 if "for_failure" in inspect.signature(data.provider.realize).parameters: 

248 kwargs["for_failure"] = True 

249 else: 

250 note_deprecation( 

251 f"{type(data.provider).__qualname__}.realize does not have the " 

252 "for_failure parameter. This will be an error in future versions " 

253 "of Hypothesis. (If you installed this backend from a separate " 

254 "package, upgrading that package may help).", 

255 has_codemod=False, 

256 since="2025-05-07", 

257 ) 

258 

259 for node in data.nodes: 

260 value = data.provider.realize(node.value, **kwargs) 

261 expected_type = { 

262 "string": str, 

263 "float": float, 

264 "integer": int, 

265 "boolean": bool, 

266 "bytes": bytes, 

267 }[node.type] 

268 if type(value) is not expected_type: 

269 raise HypothesisException( 

270 f"expected {expected_type} from " 

271 f"{data.provider.realize.__qualname__}, got {type(value)}" 

272 ) 

273 

274 constraints = cast( 

275 ChoiceConstraintsT, 

276 { 

277 k: data.provider.realize(v, **kwargs) 

278 for k, v in node.constraints.items() 

279 }, 

280 ) 

281 node.value = value 

282 node.constraints = constraints 

283 

284 

285class ConjectureRunner: 

286 def __init__( 

287 self, 

288 test_function: Callable[[ConjectureData], None], 

289 *, 

290 settings: Settings | None = None, 

291 random: Random | None = None, 

292 database_key: bytes | None = None, 

293 ignore_limits: bool = False, 

294 thread_overlap: dict[int, bool] | None = None, 

295 ) -> None: 

296 self._test_function: Callable[[ConjectureData], None] = test_function 

297 self.settings: Settings = settings or Settings() 

298 self.shrinks: int = 0 

299 self.finish_shrinking_deadline: float | None = None 

300 self.call_count: int = 0 

301 self.misaligned_count: int = 0 

302 self.valid_examples: int = 0 

303 self.invalid_examples: int = 0 

304 self.overrun_examples: int = 0 

305 self.random: Random = random or Random(_random.getrandbits(128)) 

306 self.database_key: bytes | None = database_key 

307 self.ignore_limits: bool = ignore_limits 

308 self.thread_overlap = {} if thread_overlap is None else thread_overlap 

309 

310 # Global dict of per-phase statistics, and a list of per-call stats 

311 # which transfer to the global dict at the end of each phase. 

312 self._current_phase: str = "(not a phase)" 

313 self.statistics: StatisticsDict = {} 

314 self.stats_per_test_case: list[CallStats] = [] 

315 

316 self.interesting_examples: dict[InterestingOrigin, ConjectureResult] = {} 

317 # We use call_count because there may be few possible valid_examples. 

318 self.first_bug_found_at: int | None = None 

319 self.last_bug_found_at: int | None = None 

320 self.first_bug_found_time: float = math.inf 

321 

322 self.shrunk_examples: set[InterestingOrigin] = set() 

323 self.health_check_state: HealthCheckState | None = None 

324 self.tree: DataTree = DataTree() 

325 self.provider: PrimitiveProvider | type[PrimitiveProvider] = _get_provider( 

326 self.settings.backend 

327 ) 

328 

329 self.best_observed_targets: defaultdict[str, float] = defaultdict( 

330 lambda: NO_SCORE 

331 ) 

332 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {} 

333 

334 # We keep the pareto front in the example database if we have one. This 

335 # is only marginally useful at present, but speeds up local development 

336 # because it means that large targets will be quickly surfaced in your 

337 # testing. 

338 self.pareto_front: ParetoFront | None = None 

339 if self.database_key is not None and self.settings.database is not None: 

340 self.pareto_front = ParetoFront(self.random) 

341 self.pareto_front.on_evict(self.on_pareto_evict) 

342 

343 # We want to be able to get the ConjectureData object that results 

344 # from running a choice sequence without recalculating, especially during 

345 # shrinking where we need to know about the structure of the 

346 # executed test case. 

347 self.__data_cache = LRUReusedCache[ 

348 tuple[ChoiceKeyT, ...], ConjectureResult | _Overrun 

349 ](CACHE_SIZE) 

350 

351 self.reused_previously_shrunk_test_case: bool = False 

352 

353 self.__pending_call_explanation: str | None = None 

354 self._backend_found_failure: bool = False 

355 self._backend_exceeded_deadline: bool = False 

356 self._switch_to_hypothesis_provider: bool = False 

357 

358 self.__failed_realize_count: int = 0 

359 # note unsound verification by alt backends 

360 self._verified_by: str | None = None 

361 

362 @contextmanager 

363 def _with_switch_to_hypothesis_provider( 

364 self, value: bool 

365 ) -> Generator[None, None, None]: 

366 previous = self._switch_to_hypothesis_provider 

367 try: 

368 self._switch_to_hypothesis_provider = value 

369 yield 

370 finally: 

371 self._switch_to_hypothesis_provider = previous 

372 

373 @property 

374 def using_hypothesis_backend(self) -> bool: 

375 return ( 

376 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider 

377 ) 

378 

379 def explain_next_call_as(self, explanation: str) -> None: 

380 self.__pending_call_explanation = explanation 

381 

382 def clear_call_explanation(self) -> None: 

383 self.__pending_call_explanation = None 

384 

385 @contextmanager 

386 def _log_phase_statistics( 

387 self, phase: Literal["reuse", "generate", "shrink"] 

388 ) -> Generator[None, None, None]: 

389 self.stats_per_test_case.clear() 

390 start_time = time.perf_counter() 

391 try: 

392 self._current_phase = phase 

393 yield 

394 finally: 

395 self.statistics[phase + "-phase"] = { # type: ignore 

396 "duration-seconds": time.perf_counter() - start_time, 

397 "test-cases": list(self.stats_per_test_case), 

398 "distinct-failures": len(self.interesting_examples), 

399 "shrinks-successful": self.shrinks, 

400 } 

401 

402 @property 

403 def should_optimise(self) -> bool: 

404 return Phase.target in self.settings.phases 

405 

406 def __tree_is_exhausted(self) -> bool: 

407 return self.tree.is_exhausted and self.using_hypothesis_backend 

408 

409 def __stoppable_test_function(self, data: ConjectureData) -> None: 

410 """Run ``self._test_function``, but convert a ``StopTest`` exception 

411 into a normal return and avoid raising anything flaky for RecursionErrors. 

412 """ 

413 # We ensure that the test has this much stack space remaining, no 

414 # matter the size of the stack when called, to de-flake RecursionErrors 

415 # (#2494, #3671). Note, this covers the data generation part of the test; 

416 # the actual test execution is additionally protected at the call site 

417 # in hypothesis.core.execute_once. 

418 with ensure_free_stackframes(): 

419 try: 

420 self._test_function(data) 

421 except StopTest as e: 

422 if e.testcounter == data.testcounter: 

423 # This StopTest has successfully stopped its test, and can now 

424 # be discarded. 

425 pass 

426 else: 

427 # This StopTest was raised by a different ConjectureData. We 

428 # need to re-raise it so that it will eventually reach the 

429 # correct engine. 

430 raise 

431 

432 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]: 

433 return choices_key(choices) 

434 

435 def _cache(self, data: ConjectureData) -> None: 

436 result = data.as_result() 

437 key = self._cache_key(data.choices) 

438 self.__data_cache[key] = result 

439 

440 def cached_test_function( 

441 self, 

442 choices: Sequence[ChoiceT | ChoiceTemplate], 

443 *, 

444 error_on_discard: bool = False, 

445 extend: int | Literal["full"] = 0, 

446 ) -> ConjectureResult | _Overrun: 

447 """ 

448 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard`` 

449 in preference to running the actual test function. This is to allow us 

450 to skip test cases we expect to be redundant in some cases. Note that 

451 it may be the case that we don't raise ``ContainsDiscard`` even if the 

452 result has discards if we cannot determine from previous runs whether 

453 it will have a discard. 

454 """ 

455 # node templates represent a not-yet-filled hole and therefore cannot 

456 # be cached or retrieved from the cache. 

457 if not any(isinstance(choice, ChoiceTemplate) for choice in choices): 

458 # this type cast is validated by the isinstance check above (ie, there 

459 # are no ChoiceTemplate elements). 

460 choices = cast(Sequence[ChoiceT], choices) 

461 key = self._cache_key(choices) 

462 try: 

463 cached = self.__data_cache[key] 

464 # if we have a cached overrun for this key, but we're allowing extensions 

465 # of the nodes, it could in fact run to a valid data if we try. 

466 if extend == 0 or cached.status is not Status.OVERRUN: 

467 return cached 

468 except KeyError: 

469 pass 

470 

471 if extend == "full": 

472 max_length = None 

473 elif (count := choice_count(choices)) is None: 

474 max_length = None 

475 else: 

476 max_length = count + extend 

477 

478 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver. 

479 # The reason is we don't expect simulate_test_function to explore new choices 

480 # and write back to the tree, so we don't want the overhead of the 

481 # TreeRecordingObserver tracking those calls. 

482 trial_observer: DataObserver | None = DataObserver() 

483 if error_on_discard: 

484 trial_observer = DiscardObserver() 

485 

486 try: 

487 trial_data = self.new_conjecture_data( 

488 choices, observer=trial_observer, max_choices=max_length 

489 ) 

490 self.tree.simulate_test_function(trial_data) 

491 except PreviouslyUnseenBehaviour: 

492 pass 

493 else: 

494 trial_data.freeze() 

495 key = self._cache_key(trial_data.choices) 

496 if trial_data.status > Status.OVERRUN: 

497 try: 

498 return self.__data_cache[key] 

499 except KeyError: 

500 pass 

501 else: 

502 # if we simulated to an overrun, then we our result is certainly 

503 # an overrun; no need to consult the cache. (and we store this result 

504 # for simulation-less lookup later). 

505 self.__data_cache[key] = Overrun 

506 return Overrun 

507 try: 

508 return self.__data_cache[key] 

509 except KeyError: 

510 pass 

511 

512 data = self.new_conjecture_data(choices, max_choices=max_length) 

513 # note that calling test_function caches `data` for us. 

514 self.test_function(data) 

515 return data.as_result() 

516 

517 def test_function(self, data: ConjectureData) -> None: 

518 if self.__pending_call_explanation is not None: 

519 self.debug(self.__pending_call_explanation) 

520 self.__pending_call_explanation = None 

521 

522 self.call_count += 1 

523 interrupted = False 

524 

525 try: 

526 self.__stoppable_test_function(data) 

527 except KeyboardInterrupt: 

528 interrupted = True 

529 raise 

530 except BackendCannotProceed as exc: 

531 if exc.scope in ("verified", "exhausted"): 

532 self._switch_to_hypothesis_provider = True 

533 if exc.scope == "verified": 

534 self._verified_by = self.settings.backend 

535 elif exc.scope == "discard_test_case": 

536 self.__failed_realize_count += 1 

537 if ( 

538 self.__failed_realize_count > 10 

539 and (self.__failed_realize_count / self.call_count) > 0.2 

540 ): 

541 self._switch_to_hypothesis_provider = True 

542 

543 # treat all BackendCannotProceed exceptions as invalid. This isn't 

544 # great; "verified" should really be counted as self.valid_examples += 1. 

545 # But we check self.valid_examples == 0 to determine whether to raise 

546 # Unsatisfiable, and that would throw this check off. 

547 self.invalid_examples += 1 

548 

549 # skip the post-test-case tracking; we're pretending this never happened 

550 interrupted = True 

551 data.cannot_proceed_scope = exc.scope 

552 data.freeze() 

553 return 

554 except BaseException: 

555 data.freeze() 

556 if self.settings.backend != "hypothesis": 

557 realize_choices(data, for_failure=True) 

558 self.save_choices(data.choices) 

559 raise 

560 finally: 

561 # No branch, because if we're interrupted we always raise 

562 # the KeyboardInterrupt, never continue to the code below. 

563 if not interrupted: # pragma: no branch 

564 assert data.cannot_proceed_scope is None 

565 data.freeze() 

566 

567 if self.settings.backend != "hypothesis": 

568 realize_choices(data, for_failure=data.status is Status.INTERESTING) 

569 

570 call_stats: CallStats = { 

571 "status": data.status.name.lower(), 

572 "runtime": data.finish_time - data.start_time, 

573 "drawtime": math.fsum(data.draw_times.values()), 

574 "gctime": data.gc_finish_time - data.gc_start_time, 

575 "events": sorted( 

576 k if v == "" else f"{k}: {v}" for k, v in data.events.items() 

577 ), 

578 } 

579 self.stats_per_test_case.append(call_stats) 

580 

581 self._cache(data) 

582 if data.misaligned_at is not None: # pragma: no branch # coverage bug? 

583 self.misaligned_count += 1 

584 

585 self.debug_data(data) 

586 

587 if ( 

588 data.target_observations 

589 and self.pareto_front is not None 

590 and self.pareto_front.add(data.as_result()) 

591 ): 

592 self.save_choices(data.choices, sub_key=b"pareto") 

593 

594 if data.status >= Status.VALID: 

595 for k, v in data.target_observations.items(): 

596 self.best_observed_targets[k] = max(self.best_observed_targets[k], v) 

597 

598 if k not in self.best_examples_of_observed_targets: 

599 data_as_result = data.as_result() 

600 assert not isinstance(data_as_result, _Overrun) 

601 self.best_examples_of_observed_targets[k] = data_as_result 

602 continue 

603 

604 existing_example = self.best_examples_of_observed_targets[k] 

605 existing_score = existing_example.target_observations[k] 

606 

607 if v < existing_score: 

608 continue 

609 

610 if v > existing_score or sort_key(data.nodes) < sort_key( 

611 existing_example.nodes 

612 ): 

613 data_as_result = data.as_result() 

614 assert not isinstance(data_as_result, _Overrun) 

615 self.best_examples_of_observed_targets[k] = data_as_result 

616 

617 if data.status is Status.VALID: 

618 self.valid_examples += 1 

619 if data.status is Status.INVALID: 

620 self.invalid_examples += 1 

621 if data.status is Status.OVERRUN: 

622 self.overrun_examples += 1 

623 

624 if data.status == Status.INTERESTING: 

625 if not self.using_hypothesis_backend: 

626 # replay this failure on the hypothesis backend to ensure it still 

627 # finds a failure. otherwise, it is flaky. 

628 initial_exception = data.expected_exception 

629 data = ConjectureData.for_choices(data.choices) 

630 # we've already going to use the hypothesis provider for this 

631 # data, so the verb "switch" is a bit misleading here. We're really 

632 # setting this to inform our on_observation logic that the observation 

633 # generated here was from a hypothesis backend, and shouldn't be 

634 # sent to the on_observation of any alternative backend. 

635 with self._with_switch_to_hypothesis_provider(True): 

636 self.__stoppable_test_function(data) 

637 data.freeze() 

638 # TODO: Should same-origin also be checked? (discussion in 

639 # https://github.com/HypothesisWorks/hypothesis/pull/4470#discussion_r2217055487) 

640 if data.status != Status.INTERESTING: 

641 desc_new_status = { 

642 data.status.VALID: "passed", 

643 data.status.INVALID: "failed filters", 

644 data.status.OVERRUN: "overran", 

645 }[data.status] 

646 raise FlakyBackendFailure( 

647 f"Inconsistent results from replaying a failing test case! " 

648 f"Raised {type(initial_exception).__name__} on " 

649 f"backend={self.settings.backend!r}, but " 

650 f"{desc_new_status} under backend='hypothesis'.", 

651 [initial_exception], 

652 ) 

653 

654 self._cache(data) 

655 

656 assert data.interesting_origin is not None 

657 key = data.interesting_origin 

658 changed = False 

659 try: 

660 existing = self.interesting_examples[key] 

661 except KeyError: 

662 changed = True 

663 self.last_bug_found_at = self.call_count 

664 if self.first_bug_found_at is None: 

665 self.first_bug_found_at = self.call_count 

666 self.first_bug_found_time = time.monotonic() 

667 else: 

668 if sort_key(data.nodes) < sort_key(existing.nodes): 

669 self.shrinks += 1 

670 self.downgrade_choices(existing.choices) 

671 self.__data_cache.unpin(self._cache_key(existing.choices)) 

672 changed = True 

673 

674 if changed: 

675 self.save_choices(data.choices) 

676 self.interesting_examples[key] = data.as_result() # type: ignore 

677 if not self.using_hypothesis_backend: 

678 self._backend_found_failure = True 

679 self.__data_cache.pin(self._cache_key(data.choices), data.as_result()) 

680 self.shrunk_examples.discard(key) 

681 

682 if self.shrinks >= MAX_SHRINKS: 

683 self.exit_with(ExitReason.max_shrinks) 

684 

685 if ( 

686 not self.ignore_limits 

687 and self.finish_shrinking_deadline is not None 

688 and self.finish_shrinking_deadline < time.perf_counter() 

689 ): 

690 # See https://github.com/HypothesisWorks/hypothesis/issues/2340 

691 report( 

692 "WARNING: Hypothesis has spent more than five minutes working to shrink" 

693 " a failing example, and stopped because it is making very slow" 

694 " progress. When you re-run your tests, shrinking will resume and may" 

695 " take this long before aborting again.\nPLEASE REPORT THIS if you can" 

696 " provide a reproducing example, so that we can improve shrinking" 

697 " performance for everyone." 

698 ) 

699 self.exit_with(ExitReason.very_slow_shrinking) 

700 

701 if not self.interesting_examples: 

702 # Note that this logic is reproduced to end the generation phase when 

703 # we have interesting examples. Update that too if you change this! 

704 # (The doubled implementation is because here we exit the engine entirely, 

705 # while in the other case below we just want to move on to shrinking.) 

706 if self.valid_examples >= self.settings.max_examples: 

707 self.exit_with(ExitReason.max_examples) 

708 if self.call_count >= max( 

709 self.settings.max_examples * 10, 

710 # We have a high-ish default max iterations, so that tests 

711 # don't become flaky when max_examples is too low. 

712 1000, 

713 ): 

714 self.exit_with(ExitReason.max_iterations) 

715 

716 if self.__tree_is_exhausted(): 

717 self.exit_with(ExitReason.finished) 

718 

719 self.record_for_health_check(data) 

720 

721 def on_pareto_evict(self, data: ConjectureResult) -> None: 

722 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices)) 

723 

724 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]: 

725 """Uses the tree to proactively generate a starting choice sequence 

726 that we haven't explored yet for this test. 

727 

728 When this method is called, we assume that there must be at 

729 least one novel prefix left to find. If there were not, then the 

730 test run should have already stopped due to tree exhaustion. 

731 """ 

732 return self.tree.generate_novel_prefix(self.random) 

733 

734 def record_for_health_check(self, data: ConjectureData) -> None: 

735 # Once we've actually found a bug, there's no point in trying to run 

736 # health checks - they'll just mask the actually important information. 

737 if data.status == Status.INTERESTING: 

738 self.health_check_state = None 

739 

740 state = self.health_check_state 

741 

742 if state is None: 

743 return 

744 

745 for k, v in data.draw_times.items(): 

746 state.draw_times[k].append(v) 

747 

748 if data.status == Status.VALID: 

749 state.valid_examples += 1 

750 elif data.status == Status.INVALID: 

751 state.invalid_examples += 1 

752 else: 

753 assert data.status == Status.OVERRUN 

754 state.overrun_examples += 1 

755 

756 max_valid_draws = 10 

757 max_invalid_draws = 50 

758 max_overrun_draws = 20 

759 

760 assert state.valid_examples <= max_valid_draws 

761 

762 if state.valid_examples == max_valid_draws: 

763 self.health_check_state = None 

764 return 

765 

766 if state.overrun_examples == max_overrun_draws: 

767 fail_health_check( 

768 self.settings, 

769 "Generated inputs routinely consumed more than the maximum " 

770 f"allowed entropy: {state.valid_examples} inputs were generated " 

771 f"successfully, while {state.overrun_examples} inputs exceeded the " 

772 f"maximum allowed entropy during generation." 

773 "\n\n" 

774 f"Testing with inputs this large tends to be slow, and to produce " 

775 "failures that are both difficult to shrink and difficult to understand. " 

776 "Try decreasing the amount of data generated, for example by " 

777 "decreasing the minimum size of collection strategies like " 

778 "st.lists()." 

779 "\n\n" 

780 "If you expect the average size of your input to be this large, " 

781 "you can disable this health check with " 

782 "@settings(suppress_health_check=[HealthCheck.data_too_large]). " 

783 "See " 

784 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

785 "for details.", 

786 HealthCheck.data_too_large, 

787 ) 

788 if state.invalid_examples == max_invalid_draws: 

789 fail_health_check( 

790 self.settings, 

791 "It looks like this test is filtering out a lot of inputs. " 

792 f"{state.valid_examples} inputs were generated successfully, " 

793 f"while {state.invalid_examples} inputs were filtered out. " 

794 "\n\n" 

795 "An input might be filtered out by calls to assume(), " 

796 "strategy.filter(...), or occasionally by Hypothesis internals." 

797 "\n\n" 

798 "Applying this much filtering makes input generation slow, since " 

799 "Hypothesis must discard inputs which are filtered out and try " 

800 "generating it again. It is also possible that applying this much " 

801 "filtering will distort the domain and/or distribution of the test, " 

802 "leaving your testing less rigorous than expected." 

803 "\n\n" 

804 "If you expect this many inputs to be filtered out during generation, " 

805 "you can disable this health check with " 

806 "@settings(suppress_health_check=[HealthCheck.filter_too_much]). See " 

807 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

808 "for details.", 

809 HealthCheck.filter_too_much, 

810 ) 

811 

812 # Allow at least the greater of one second or 5x the deadline. If deadline 

813 # is None, allow 30s - the user can disable the healthcheck too if desired. 

814 draw_time = state.total_draw_time 

815 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6)) 

816 if ( 

817 draw_time > max(1.0, draw_time_limit.total_seconds()) 

818 # we disable HealthCheck.too_slow under concurrent threads, since 

819 # cpython may switch away from a thread for arbitrarily long. 

820 and not self.thread_overlap.get(threading.get_ident(), False) 

821 ): 

822 extra_str = [] 

823 if state.invalid_examples: 

824 extra_str.append(f"{state.invalid_examples} invalid inputs") 

825 if state.overrun_examples: 

826 extra_str.append( 

827 f"{state.overrun_examples} inputs which exceeded the " 

828 "maximum allowed entropy" 

829 ) 

830 extra_str = ", and ".join(extra_str) 

831 extra_str = f" ({extra_str})" if extra_str else "" 

832 

833 fail_health_check( 

834 self.settings, 

835 "Input generation is slow: Hypothesis only generated " 

836 f"{state.valid_examples} valid inputs after {draw_time:.2f} " 

837 f"seconds{extra_str}." 

838 "\n" + state.timing_report() + "\n\n" 

839 "This could be for a few reasons:" 

840 "\n" 

841 "1. This strategy could be generating too much data per input. " 

842 "Try decreasing the amount of data generated, for example by " 

843 "decreasing the minimum size of collection strategies like " 

844 "st.lists()." 

845 "\n" 

846 "2. Some other expensive computation could be running during input " 

847 "generation. For example, " 

848 "if @st.composite or st.data() is interspersed with an expensive " 

849 "computation, HealthCheck.too_slow is likely to trigger. If this " 

850 "computation is unrelated to input generation, move it elsewhere. " 

851 "Otherwise, try making it more efficient, or disable this health " 

852 "check if that is not possible." 

853 "\n\n" 

854 "If you expect input generation to take this long, you can disable " 

855 "this health check with " 

856 "@settings(suppress_health_check=[HealthCheck.too_slow]). See " 

857 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

858 "for details.", 

859 HealthCheck.too_slow, 

860 ) 

861 

862 def save_choices( 

863 self, choices: Sequence[ChoiceT], sub_key: bytes | None = None 

864 ) -> None: 

865 if self.settings.database is not None: 

866 key = self.sub_key(sub_key) 

867 if key is None: 

868 return 

869 self.settings.database.save(key, choices_to_bytes(choices)) 

870 

871 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None: 

872 buffer = choices_to_bytes(choices) 

873 if self.settings.database is not None and self.database_key is not None: 

874 self.settings.database.move(self.database_key, self.secondary_key, buffer) 

875 

876 def sub_key(self, sub_key: bytes | None) -> bytes | None: 

877 if self.database_key is None: 

878 return None 

879 if sub_key is None: 

880 return self.database_key 

881 return b".".join((self.database_key, sub_key)) 

882 

883 @property 

884 def secondary_key(self) -> bytes | None: 

885 return self.sub_key(b"secondary") 

886 

887 @property 

888 def pareto_key(self) -> bytes | None: 

889 return self.sub_key(b"pareto") 

890 

891 def debug(self, message: str) -> None: 

892 if self.settings.verbosity >= Verbosity.debug: 

893 base_report(message) 

894 

895 @property 

896 def report_debug_info(self) -> bool: 

897 return self.settings.verbosity >= Verbosity.debug 

898 

899 def debug_data(self, data: ConjectureData | ConjectureResult) -> None: 

900 if not self.report_debug_info: 

901 return 

902 

903 status = repr(data.status) 

904 if data.status == Status.INTERESTING: 

905 status = f"{status} ({data.interesting_origin!r})" 

906 

907 self.debug( 

908 f"{len(data.choices)} choices {data.choices} -> {status}" 

909 f"{', ' + data.output if data.output else ''}" 

910 ) 

911 

912 def observe_for_provider(self) -> AbstractContextManager: 

913 def on_observation(observation: Observation) -> None: 

914 assert observation.type == "test_case" 

915 # because lifetime == "test_function" 

916 assert isinstance(self.provider, PrimitiveProvider) 

917 # only fire if we actually used that provider to generate this observation 

918 if not self._switch_to_hypothesis_provider: 

919 self.provider.on_observation(observation) 

920 

921 if ( 

922 self.settings.backend != "hypothesis" 

923 # only for lifetime = "test_function" providers (guaranteed 

924 # by this isinstance check) 

925 and isinstance(self.provider, PrimitiveProvider) 

926 # and the provider opted-in to observations 

927 and self.provider.add_observability_callback 

928 ): 

929 return with_observability_callback(on_observation) 

930 return nullcontext() 

931 

932 def run(self) -> None: 

933 with local_settings(self.settings), self.observe_for_provider(): 

934 try: 

935 self._run() 

936 except RunIsComplete: 

937 pass 

938 for v in self.interesting_examples.values(): 

939 self.debug_data(v) 

940 self.debug( 

941 f"Run complete after {self.call_count} examples " 

942 f"({self.valid_examples} valid) and {self.shrinks} shrinks" 

943 ) 

944 

945 @property 

946 def database(self) -> ExampleDatabase | None: 

947 if self.database_key is None: 

948 return None 

949 return self.settings.database 

950 

951 def has_existing_examples(self) -> bool: 

952 return self.database is not None and Phase.reuse in self.settings.phases 

953 

954 def reuse_existing_examples(self) -> None: 

955 """If appropriate (we have a database and have been told to use it), 

956 try to reload existing examples from the database. 

957 

958 If there are a lot we don't try all of them. We always try the 

959 smallest example in the database (which is guaranteed to be the 

960 last failure) and the largest (which is usually the seed example 

961 which the last failure came from but we don't enforce that). We 

962 then take a random sampling of the remainder and try those. Any 

963 examples that are no longer interesting are cleared out. 

964 """ 

965 if self.has_existing_examples(): 

966 self.debug("Reusing examples from database") 

967 # We have to do some careful juggling here. We have two database 

968 # corpora: The primary and secondary. The primary corpus is a 

969 # small set of minimized examples each of which has at one point 

970 # demonstrated a distinct bug. We want to retry all of these. 

971 

972 # We also have a secondary corpus of examples that have at some 

973 # point demonstrated interestingness (currently only ones that 

974 # were previously non-minimal examples of a bug, but this will 

975 # likely expand in future). These are a good source of potentially 

976 # interesting examples, but there are a lot of them, so we down 

977 # sample the secondary corpus to a more manageable size. 

978 

979 corpus = sorted( 

980 self.settings.database.fetch(self.database_key), key=shortlex 

981 ) 

982 factor = 0.1 if (Phase.generate in self.settings.phases) else 1 

983 desired_size = max(2, ceil(factor * self.settings.max_examples)) 

984 primary_corpus_size = len(corpus) 

985 

986 if len(corpus) < desired_size: 

987 extra_corpus = list(self.settings.database.fetch(self.secondary_key)) 

988 

989 shortfall = desired_size - len(corpus) 

990 

991 if len(extra_corpus) <= shortfall: 

992 extra = extra_corpus 

993 else: 

994 extra = self.random.sample(extra_corpus, shortfall) 

995 extra.sort(key=shortlex) 

996 corpus.extend(extra) 

997 

998 # We want a fast path where every primary entry in the database was 

999 # interesting. 

1000 found_interesting_in_primary = False 

1001 all_interesting_in_primary_were_exact = True 

1002 

1003 for i, existing in enumerate(corpus): 

1004 if i >= primary_corpus_size and found_interesting_in_primary: 

1005 break 

1006 choices = choices_from_bytes(existing) 

1007 if choices is None: 

1008 # clear out any keys which fail deserialization 

1009 self.settings.database.delete(self.database_key, existing) 

1010 continue 

1011 data = self.cached_test_function(choices, extend="full") 

1012 if data.status != Status.INTERESTING: 

1013 self.settings.database.delete(self.database_key, existing) 

1014 self.settings.database.delete(self.secondary_key, existing) 

1015 else: 

1016 if i < primary_corpus_size: 

1017 found_interesting_in_primary = True 

1018 assert not isinstance(data, _Overrun) 

1019 if choices_key(choices) != choices_key(data.choices): 

1020 all_interesting_in_primary_were_exact = False 

1021 if not self.settings.report_multiple_bugs: 

1022 break 

1023 if found_interesting_in_primary: 

1024 if all_interesting_in_primary_were_exact: 

1025 self.reused_previously_shrunk_test_case = True 

1026 

1027 # Because self.database is not None (because self.has_existing_examples()) 

1028 # and self.database_key is not None (because we fetched using it above), 

1029 # we can guarantee self.pareto_front is not None 

1030 assert self.pareto_front is not None 

1031 

1032 # If we've not found any interesting examples so far we try some of 

1033 # the pareto front from the last run. 

1034 if len(corpus) < desired_size and not self.interesting_examples: 

1035 desired_extra = desired_size - len(corpus) 

1036 pareto_corpus = list(self.settings.database.fetch(self.pareto_key)) 

1037 if len(pareto_corpus) > desired_extra: 

1038 pareto_corpus = self.random.sample(pareto_corpus, desired_extra) 

1039 pareto_corpus.sort(key=shortlex) 

1040 

1041 for existing in pareto_corpus: 

1042 choices = choices_from_bytes(existing) 

1043 if choices is None: 

1044 self.settings.database.delete(self.pareto_key, existing) 

1045 continue 

1046 data = self.cached_test_function(choices, extend="full") 

1047 if data not in self.pareto_front: 

1048 self.settings.database.delete(self.pareto_key, existing) 

1049 if data.status == Status.INTERESTING: 

1050 break 

1051 

1052 def exit_with(self, reason: ExitReason) -> None: 

1053 if self.ignore_limits: 

1054 return 

1055 self.statistics["stopped-because"] = reason.describe(self.settings) 

1056 if self.best_observed_targets: 

1057 self.statistics["targets"] = dict(self.best_observed_targets) 

1058 self.debug(f"exit_with({reason.name})") 

1059 self.exit_reason = reason 

1060 raise RunIsComplete 

1061 

1062 def should_generate_more(self) -> bool: 

1063 # End the generation phase where we would have ended it if no bugs had 

1064 # been found. This reproduces the exit logic in `self.test_function`, 

1065 # but with the important distinction that this clause will move on to 

1066 # the shrinking phase having found one or more bugs, while the other 

1067 # will exit having found zero bugs. 

1068 if self.valid_examples >= self.settings.max_examples or self.call_count >= max( 

1069 self.settings.max_examples * 10, 1000 

1070 ): # pragma: no cover 

1071 return False 

1072 

1073 # If we haven't found a bug, keep looking - if we hit any limits on 

1074 # the number of tests to run that will raise an exception and stop 

1075 # the run. 

1076 if not self.interesting_examples: 

1077 return True 

1078 # Users who disable shrinking probably want to exit as fast as possible. 

1079 # If we've found a bug and won't report more than one, stop looking. 

1080 # If we first saw a bug more than 10 seconds ago, stop looking. 

1081 elif ( 

1082 Phase.shrink not in self.settings.phases 

1083 or not self.settings.report_multiple_bugs 

1084 or time.monotonic() - self.first_bug_found_time > 10 

1085 ): 

1086 return False 

1087 assert isinstance(self.first_bug_found_at, int) 

1088 assert isinstance(self.last_bug_found_at, int) 

1089 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count 

1090 # Otherwise, keep searching for between ten and 'a heuristic' calls. 

1091 # We cap 'calls after first bug' so errors are reported reasonably 

1092 # soon even for tests that are allowed to run for a very long time, 

1093 # or sooner if the latest half of our test effort has been fruitless. 

1094 return self.call_count < MIN_TEST_CALLS or self.call_count < min( 

1095 self.first_bug_found_at + 1000, self.last_bug_found_at * 2 

1096 ) 

1097 

1098 def generate_new_examples(self) -> None: 

1099 if Phase.generate not in self.settings.phases: 

1100 return 

1101 if self.interesting_examples: 

1102 # The example database has failing examples from a previous run, 

1103 # so we'd rather report that they're still failing ASAP than take 

1104 # the time to look for additional failures. 

1105 return 

1106 

1107 self.debug("Generating new examples") 

1108 

1109 assert self.should_generate_more() 

1110 self._switch_to_hypothesis_provider = True 

1111 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),)) 

1112 if zero_data.status > Status.OVERRUN: 

1113 assert isinstance(zero_data, ConjectureResult) 

1114 # if the crosshair backend cannot proceed, it does not (and cannot) 

1115 # realize the symbolic values, with the intent that Hypothesis will 

1116 # throw away this test case. We usually do, but if it's the zero data 

1117 # then we try to pin it here, which requires realizing the symbolics. 

1118 # 

1119 # We don't (yet) rely on the zero data being pinned, and so 

1120 # it's simply a very slight performance loss to simply not pin it 

1121 # if doing so would error. 

1122 if zero_data.cannot_proceed_scope is None: # pragma: no branch 

1123 self.__data_cache.pin( 

1124 self._cache_key(zero_data.choices), zero_data.as_result() 

1125 ) # Pin forever 

1126 

1127 if zero_data.status == Status.OVERRUN or ( 

1128 zero_data.status == Status.VALID 

1129 and isinstance(zero_data, ConjectureResult) 

1130 and zero_data.length * 2 > BUFFER_SIZE 

1131 ): 

1132 fail_health_check( 

1133 self.settings, 

1134 "The smallest natural input for this test is very " 

1135 "large. This makes it difficult for Hypothesis to generate " 

1136 "good inputs, especially when trying to shrink failing inputs." 

1137 "\n\n" 

1138 "Consider reducing the amount of data generated by the strategy. " 

1139 "Also consider introducing small alternative values for some " 

1140 "strategies. For example, could you " 

1141 "mark some arguments as optional by replacing `some_complex_strategy`" 

1142 "with `st.none() | some_complex_strategy`?" 

1143 "\n\n" 

1144 "If you are confident that the size of the smallest natural input " 

1145 "to your test cannot be reduced, you can suppress this health check " 

1146 "with @settings(suppress_health_check=[HealthCheck.large_base_example]). " 

1147 "See " 

1148 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

1149 "for details.", 

1150 HealthCheck.large_base_example, 

1151 ) 

1152 

1153 self.health_check_state = HealthCheckState() 

1154 

1155 # We attempt to use the size of the minimal generated test case starting 

1156 # from a given novel prefix as a guideline to generate smaller test 

1157 # cases for an initial period, by restriscting ourselves to test cases 

1158 # that are not much larger than it. 

1159 # 

1160 # Calculating the actual minimal generated test case is hard, so we 

1161 # take a best guess that zero extending a prefix produces the minimal 

1162 # test case starting with that prefix (this is true for our built in 

1163 # strategies). This is only a reasonable thing to do if the resulting 

1164 # test case is valid. If we regularly run into situations where it is 

1165 # not valid then this strategy is a waste of time, so we want to 

1166 # abandon it early. In order to do this we track how many times in a 

1167 # row it has failed to work, and abort small test case generation when 

1168 # it has failed too many times in a row. 

1169 consecutive_zero_extend_is_invalid = 0 

1170 

1171 # We control growth during initial example generation, for two 

1172 # reasons: 

1173 # 

1174 # * It gives us an opportunity to find small examples early, which 

1175 # gives us a fast path for easy to find bugs. 

1176 # * It avoids low probability events where we might end up 

1177 # generating very large examples during health checks, which 

1178 # on slower machines can trigger HealthCheck.too_slow. 

1179 # 

1180 # The heuristic we use is that we attempt to estimate the smallest 

1181 # extension of this prefix, and limit the size to no more than 

1182 # an order of magnitude larger than that. If we fail to estimate 

1183 # the size accurately, we skip over this prefix and try again. 

1184 # 

1185 # We need to tune the example size based on the initial prefix, 

1186 # because any fixed size might be too small, and any size based 

1187 # on the strategy in general can fall afoul of strategies that 

1188 # have very different sizes for different prefixes. 

1189 # 

1190 # We previously set a minimum value of 10 on small_example_cap, with the 

1191 # reasoning of avoiding flaky health checks. However, some users set a 

1192 # low max_examples for performance. A hard lower bound in this case biases 

1193 # the distribution towards small (and less powerful) examples. Flaky 

1194 # and loud health checks are better than silent performance degradation. 

1195 small_example_cap = min(self.settings.max_examples // 10, 50) 

1196 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10) 

1197 ran_optimisations = False 

1198 self._switch_to_hypothesis_provider = False 

1199 

1200 while self.should_generate_more(): 

1201 # we don't yet integrate DataTree with backends. Instead of generating 

1202 # a novel prefix, ask the backend for an input. 

1203 if not self.using_hypothesis_backend: 

1204 data = self.new_conjecture_data([]) 

1205 with suppress(BackendCannotProceed): 

1206 self.test_function(data) 

1207 continue 

1208 

1209 self._current_phase = "generate" 

1210 prefix = self.generate_novel_prefix() 

1211 if ( 

1212 self.valid_examples <= small_example_cap 

1213 and self.call_count <= 5 * small_example_cap 

1214 and not self.interesting_examples 

1215 and consecutive_zero_extend_is_invalid < 5 

1216 ): 

1217 minimal_example = self.cached_test_function( 

1218 prefix + (ChoiceTemplate("simplest", count=None),) 

1219 ) 

1220 

1221 if minimal_example.status < Status.VALID: 

1222 consecutive_zero_extend_is_invalid += 1 

1223 continue 

1224 # Because the Status code is greater than Status.VALID, it cannot be 

1225 # Status.OVERRUN, which guarantees that the minimal_example is a 

1226 # ConjectureResult object. 

1227 assert isinstance(minimal_example, ConjectureResult) 

1228 consecutive_zero_extend_is_invalid = 0 

1229 minimal_extension = len(minimal_example.choices) - len(prefix) 

1230 max_length = len(prefix) + minimal_extension * 5 

1231 

1232 # We could end up in a situation where even though the prefix was 

1233 # novel when we generated it, because we've now tried zero extending 

1234 # it not all possible continuations of it will be novel. In order to 

1235 # avoid making redundant test calls, we rerun it in simulation mode 

1236 # first. If this has a predictable result, then we don't bother 

1237 # running the test function for real here. If however we encounter 

1238 # some novel behaviour, we try again with the real test function, 

1239 # starting from the new novel prefix that has discovered. 

1240 trial_data = self.new_conjecture_data(prefix, max_choices=max_length) 

1241 try: 

1242 self.tree.simulate_test_function(trial_data) 

1243 continue 

1244 except PreviouslyUnseenBehaviour: 

1245 pass 

1246 

1247 # If the simulation entered part of the tree that has been killed, 

1248 # we don't want to run this. 

1249 assert isinstance(trial_data.observer, TreeRecordingObserver) 

1250 if trial_data.observer.killed: 

1251 continue 

1252 

1253 # We might have hit the cap on number of examples we should 

1254 # run when calculating the minimal example. 

1255 if not self.should_generate_more(): 

1256 break 

1257 

1258 prefix = trial_data.choices 

1259 else: 

1260 max_length = None 

1261 

1262 data = self.new_conjecture_data(prefix, max_choices=max_length) 

1263 self.test_function(data) 

1264 

1265 if ( 

1266 data.status is Status.OVERRUN 

1267 and max_length is not None 

1268 and "invalid because" not in data.events 

1269 ): 

1270 data.events["invalid because"] = ( 

1271 "reduced max size for early examples (avoids flaky health checks)" 

1272 ) 

1273 

1274 self.generate_mutations_from(data) 

1275 

1276 # Although the optimisations are logically a distinct phase, we 

1277 # actually normally run them as part of example generation. The 

1278 # reason for this is that we cannot guarantee that optimisation 

1279 # actually exhausts our budget: It might finish running and we 

1280 # discover that actually we still could run a bunch more test cases 

1281 # if we want. 

1282 if ( 

1283 self.valid_examples >= max(small_example_cap, optimise_at) 

1284 and not ran_optimisations 

1285 ): 

1286 ran_optimisations = True 

1287 self._current_phase = "target" 

1288 self.optimise_targets() 

1289 

1290 def generate_mutations_from(self, data: ConjectureData | ConjectureResult) -> None: 

1291 # A thing that is often useful but rarely happens by accident is 

1292 # to generate the same value at multiple different points in the 

1293 # test case. 

1294 # 

1295 # Rather than make this the responsibility of individual strategies 

1296 # we implement a small mutator that just takes parts of the test 

1297 # case with the same label and tries replacing one of them with a 

1298 # copy of the other and tries running it. If we've made a good 

1299 # guess about what to put where, this will run a similar generated 

1300 # test case with more duplication. 

1301 if ( 

1302 # An OVERRUN doesn't have enough information about the test 

1303 # case to mutate, so we just skip those. 

1304 data.status >= Status.INVALID 

1305 # This has a tendency to trigger some weird edge cases during 

1306 # generation so we don't let it run until we're done with the 

1307 # health checks. 

1308 and self.health_check_state is None 

1309 ): 

1310 initial_calls = self.call_count 

1311 failed_mutations = 0 

1312 

1313 while ( 

1314 self.should_generate_more() 

1315 # We implement fairly conservative checks for how long we 

1316 # we should run mutation for, as it's generally not obvious 

1317 # how helpful it is for any given test case. 

1318 and self.call_count <= initial_calls + 5 

1319 and failed_mutations <= 5 

1320 ): 

1321 groups = data.spans.mutator_groups 

1322 if not groups: 

1323 break 

1324 

1325 group = self.random.choice(groups) 

1326 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2) 

1327 if start1 > start2: 

1328 (start1, end1), (start2, end2) = (start2, end2), (start1, end1) 

1329 

1330 if ( 

1331 start1 <= start2 <= end2 <= end1 

1332 ): # pragma: no cover # flaky on conjecture-cover tests 

1333 # One span entirely contains the other. The strategy is very 

1334 # likely some kind of tree. e.g. we might have 

1335 # 

1336 # ┌─────┐ 

1337 # ┌─────┤ a ├──────┐ 

1338 # │ └─────┘ │ 

1339 # ┌──┴──┐ ┌──┴──┐ 

1340 # ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1341 # │ └──┬──┘ │ │ └──┬──┘ │ 

1342 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1343 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1344 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1345 # 

1346 # where each node is drawn from the same strategy and so 

1347 # has the same span label. We might have selected the spans 

1348 # corresponding to the a and c nodes, which is the entire 

1349 # tree and the subtree of (and including) c respectively. 

1350 # 

1351 # There are two possible mutations we could apply in this case: 

1352 # 1. replace a with c (replace child with parent) 

1353 # 2. replace c with a (replace parent with child) 

1354 # 

1355 # (1) results in multiple partial copies of the 

1356 # parent: 

1357 # ┌─────┐ 

1358 # ┌─────┤ a ├────────────┐ 

1359 # │ └─────┘ │ 

1360 # ┌──┴──┐ ┌─┴───┐ 

1361 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐ 

1362 # │ └──┬──┘ │ │ └─────┘ │ 

1363 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐ 

1364 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1365 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │ 

1366 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1367 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1368 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1369 # 

1370 # While (2) results in truncating part of the parent: 

1371 # 

1372 # ┌─────┐ 

1373 # ┌──┤ c ├──┐ 

1374 # │ └──┬──┘ │ 

1375 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1376 # │ g │ │ h │ │ i │ 

1377 # └───┘ └───┘ └───┘ 

1378 # 

1379 # (1) is the same as Example IV.4. in Nautilus (NDSS '19) 

1380 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf), 

1381 # except we do not repeat the replacement additional times 

1382 # (the paper repeats it once for a total of two copies). 

1383 # 

1384 # We currently only apply mutation (1), and ignore mutation 

1385 # (2). The reason is that the attempt generated from (2) is 

1386 # always something that Hypothesis could easily have generated 

1387 # itself, by simply not making various choices. Whereas 

1388 # duplicating the exact value + structure of particular choices 

1389 # in (1) would have been hard for Hypothesis to generate by 

1390 # chance. 

1391 # 

1392 # TODO: an extension of this mutation might repeat (1) on 

1393 # a geometric distribution between 0 and ~10 times. We would 

1394 # need to find the corresponding span to recurse on in the new 

1395 # choices, probably just by using the choices index. 

1396 

1397 # case (1): duplicate the choices in start1:start2. 

1398 attempt = data.choices[:start2] + data.choices[start1:] 

1399 else: 

1400 (start, end) = self.random.choice([(start1, end1), (start2, end2)]) 

1401 replacement = data.choices[start:end] 

1402 # We attempt to replace both the examples with 

1403 # whichever choice we made. Note that this might end 

1404 # up messing up and getting the example boundaries 

1405 # wrong - labels matching are only a best guess as to 

1406 # whether the two are equivalent - but it doesn't 

1407 # really matter. It may not achieve the desired result, 

1408 # but it's still a perfectly acceptable choice sequence 

1409 # to try. 

1410 attempt = ( 

1411 data.choices[:start1] 

1412 + replacement 

1413 + data.choices[end1:start2] 

1414 + replacement 

1415 + data.choices[end2:] 

1416 ) 

1417 

1418 try: 

1419 new_data = self.cached_test_function( 

1420 attempt, 

1421 # We set error_on_discard so that we don't end up 

1422 # entering parts of the tree we consider redundant 

1423 # and not worth exploring. 

1424 error_on_discard=True, 

1425 ) 

1426 except ContainsDiscard: 

1427 failed_mutations += 1 

1428 continue 

1429 

1430 if new_data is Overrun: 

1431 failed_mutations += 1 # pragma: no cover # annoying case 

1432 else: 

1433 assert isinstance(new_data, ConjectureResult) 

1434 if ( 

1435 new_data.status >= data.status 

1436 and choices_key(data.choices) != choices_key(new_data.choices) 

1437 and all( 

1438 k in new_data.target_observations 

1439 and new_data.target_observations[k] >= v 

1440 for k, v in data.target_observations.items() 

1441 ) 

1442 ): 

1443 data = new_data 

1444 failed_mutations = 0 

1445 else: 

1446 failed_mutations += 1 

1447 

1448 def optimise_targets(self) -> None: 

1449 """If any target observations have been made, attempt to optimise them 

1450 all.""" 

1451 if not self.should_optimise: 

1452 return 

1453 from hypothesis.internal.conjecture.optimiser import Optimiser 

1454 

1455 # We want to avoid running the optimiser for too long in case we hit 

1456 # an unbounded target score. We start this off fairly conservatively 

1457 # in case interesting examples are easy to find and then ramp it up 

1458 # on an exponential schedule so we don't hamper the optimiser too much 

1459 # if it needs a long time to find good enough improvements. 

1460 max_improvements = 10 

1461 while True: 

1462 prev_calls = self.call_count 

1463 

1464 any_improvements = False 

1465 

1466 for target, data in list(self.best_examples_of_observed_targets.items()): 

1467 optimiser = Optimiser( 

1468 self, data, target, max_improvements=max_improvements 

1469 ) 

1470 optimiser.run() 

1471 if optimiser.improvements > 0: 

1472 any_improvements = True 

1473 

1474 if self.interesting_examples: 

1475 break 

1476 

1477 max_improvements *= 2 

1478 

1479 if any_improvements: 

1480 continue 

1481 

1482 if self.best_observed_targets: 

1483 self.pareto_optimise() 

1484 

1485 if prev_calls == self.call_count: 

1486 break 

1487 

1488 def pareto_optimise(self) -> None: 

1489 if self.pareto_front is not None: 

1490 ParetoOptimiser(self).run() 

1491 

1492 def _run(self) -> None: 

1493 # have to use the primitive provider to interpret database bits... 

1494 self._switch_to_hypothesis_provider = True 

1495 with self._log_phase_statistics("reuse"): 

1496 self.reuse_existing_examples() 

1497 # Fast path for development: If the database gave us interesting 

1498 # examples from the previously stored primary key, don't try 

1499 # shrinking it again as it's unlikely to work. 

1500 if self.reused_previously_shrunk_test_case: 

1501 self.exit_with(ExitReason.finished) 

1502 # ...but we should use the supplied provider when generating... 

1503 self._switch_to_hypothesis_provider = False 

1504 with self._log_phase_statistics("generate"): 

1505 self.generate_new_examples() 

1506 # We normally run the targeting phase mixed in with the generate phase, 

1507 # but if we've been asked to run it but not generation then we have to 

1508 # run it explicitly on its own here. 

1509 if Phase.generate not in self.settings.phases: 

1510 self._current_phase = "target" 

1511 self.optimise_targets() 

1512 # ...and back to the primitive provider when shrinking. 

1513 self._switch_to_hypothesis_provider = True 

1514 with self._log_phase_statistics("shrink"): 

1515 self.shrink_interesting_examples() 

1516 self.exit_with(ExitReason.finished) 

1517 

1518 def new_conjecture_data( 

1519 self, 

1520 prefix: Sequence[ChoiceT | ChoiceTemplate], 

1521 *, 

1522 observer: DataObserver | None = None, 

1523 max_choices: int | None = None, 

1524 ) -> ConjectureData: 

1525 provider = ( 

1526 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider 

1527 ) 

1528 observer = observer or self.tree.new_observer() 

1529 if not self.using_hypothesis_backend: 

1530 observer = DataObserver() 

1531 

1532 return ConjectureData( 

1533 prefix=prefix, 

1534 observer=observer, 

1535 provider=provider, 

1536 max_choices=max_choices, 

1537 random=self.random, 

1538 ) 

1539 

1540 def shrink_interesting_examples(self) -> None: 

1541 """If we've found interesting examples, try to replace each of them 

1542 with a minimal interesting example with the same interesting_origin. 

1543 

1544 We may find one or more examples with a new interesting_origin 

1545 during the shrink process. If so we shrink these too. 

1546 """ 

1547 if Phase.shrink not in self.settings.phases or not self.interesting_examples: 

1548 return 

1549 

1550 self.debug("Shrinking interesting examples") 

1551 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS 

1552 

1553 for prev_data in sorted( 

1554 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes) 

1555 ): 

1556 assert prev_data.status == Status.INTERESTING 

1557 data = self.new_conjecture_data(prev_data.choices) 

1558 self.test_function(data) 

1559 if data.status != Status.INTERESTING: 

1560 self.exit_with(ExitReason.flaky) 

1561 

1562 self.clear_secondary_key() 

1563 

1564 while len(self.shrunk_examples) < len(self.interesting_examples): 

1565 target, example = min( 

1566 ( 

1567 (k, v) 

1568 for k, v in self.interesting_examples.items() 

1569 if k not in self.shrunk_examples 

1570 ), 

1571 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))), 

1572 ) 

1573 self.debug(f"Shrinking {target!r}: {example.choices}") 

1574 

1575 if not self.settings.report_multiple_bugs: 

1576 # If multi-bug reporting is disabled, we shrink our currently-minimal 

1577 # failure, allowing 'slips' to any bug with a smaller minimal example. 

1578 self.shrink(example, lambda d: d.status == Status.INTERESTING) 

1579 return 

1580 

1581 def predicate(d: ConjectureResult | _Overrun) -> bool: 

1582 if d.status < Status.INTERESTING: 

1583 return False 

1584 d = cast(ConjectureResult, d) 

1585 return d.interesting_origin == target 

1586 

1587 self.shrink(example, predicate) 

1588 

1589 self.shrunk_examples.add(target) 

1590 

1591 def clear_secondary_key(self) -> None: 

1592 if self.has_existing_examples(): 

1593 # If we have any smaller examples in the secondary corpus, now is 

1594 # a good time to try them to see if they work as shrinks. They 

1595 # probably won't, but it's worth a shot and gives us a good 

1596 # opportunity to clear out the database. 

1597 

1598 # It's not worth trying the primary corpus because we already 

1599 # tried all of those in the initial phase. 

1600 corpus = sorted( 

1601 self.settings.database.fetch(self.secondary_key), key=shortlex 

1602 ) 

1603 for c in corpus: 

1604 choices = choices_from_bytes(c) 

1605 if choices is None: 

1606 self.settings.database.delete(self.secondary_key, c) 

1607 continue 

1608 primary = { 

1609 choices_to_bytes(v.choices) 

1610 for v in self.interesting_examples.values() 

1611 } 

1612 if shortlex(c) > max(map(shortlex, primary)): 

1613 break 

1614 

1615 self.cached_test_function(choices) 

1616 # We unconditionally remove c from the secondary key as it 

1617 # is either now primary or worse than our primary example 

1618 # of this reason for interestingness. 

1619 self.settings.database.delete(self.secondary_key, c) 

1620 

1621 def shrink( 

1622 self, 

1623 example: ConjectureData | ConjectureResult, 

1624 predicate: ShrinkPredicateT | None = None, 

1625 allow_transition: ( 

1626 Callable[[ConjectureData | ConjectureResult, ConjectureData], bool] | None 

1627 ) = None, 

1628 ) -> ConjectureData | ConjectureResult: 

1629 s = self.new_shrinker(example, predicate, allow_transition) 

1630 s.shrink() 

1631 return s.shrink_target 

1632 

1633 def new_shrinker( 

1634 self, 

1635 example: ConjectureData | ConjectureResult, 

1636 predicate: ShrinkPredicateT | None = None, 

1637 allow_transition: ( 

1638 Callable[[ConjectureData | ConjectureResult, ConjectureData], bool] | None 

1639 ) = None, 

1640 ) -> Shrinker: 

1641 return Shrinker( 

1642 self, 

1643 example, 

1644 predicate, 

1645 allow_transition=allow_transition, 

1646 explain=Phase.explain in self.settings.phases, 

1647 in_target_phase=self._current_phase == "target", 

1648 ) 

1649 

1650 def passing_choice_sequences( 

1651 self, prefix: Sequence[ChoiceNode] = () 

1652 ) -> frozenset[tuple[ChoiceNode, ...]]: 

1653 """Return a collection of choice sequence nodes which cause the test to pass. 

1654 Optionally restrict this by a certain prefix, which is useful for explain mode. 

1655 """ 

1656 return frozenset( 

1657 cast(ConjectureResult, result).nodes 

1658 for key in self.__data_cache 

1659 if (result := self.__data_cache[key]).status is Status.VALID 

1660 and startswith(cast(ConjectureResult, result).nodes, prefix) 

1661 ) 

1662 

1663 

1664class ContainsDiscard(Exception): 

1665 pass