Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/engine.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

702 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import importlib 

12import inspect 

13import math 

14import threading 

15import time 

16from collections import defaultdict 

17from collections.abc import Generator, Sequence 

18from contextlib import AbstractContextManager, contextmanager, nullcontext, suppress 

19from dataclasses import dataclass, field 

20from datetime import timedelta 

21from enum import Enum 

22from random import Random, getrandbits 

23from typing import Callable, Literal, NoReturn, Optional, Union, cast 

24 

25from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings 

26from hypothesis._settings import local_settings, note_deprecation 

27from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes 

28from hypothesis.errors import ( 

29 BackendCannotProceed, 

30 FlakyBackendFailure, 

31 HypothesisException, 

32 InvalidArgument, 

33 StopTest, 

34) 

35from hypothesis.internal.cache import LRUReusedCache 

36from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override 

37from hypothesis.internal.conjecture.choice import ( 

38 ChoiceConstraintsT, 

39 ChoiceKeyT, 

40 ChoiceNode, 

41 ChoiceT, 

42 ChoiceTemplate, 

43 choices_key, 

44) 

45from hypothesis.internal.conjecture.data import ( 

46 ConjectureData, 

47 ConjectureResult, 

48 DataObserver, 

49 Overrun, 

50 Status, 

51 _Overrun, 

52) 

53from hypothesis.internal.conjecture.datatree import ( 

54 DataTree, 

55 PreviouslyUnseenBehaviour, 

56 TreeRecordingObserver, 

57) 

58from hypothesis.internal.conjecture.junkdrawer import ( 

59 ensure_free_stackframes, 

60 startswith, 

61) 

62from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser 

63from hypothesis.internal.conjecture.providers import ( 

64 AVAILABLE_PROVIDERS, 

65 HypothesisProvider, 

66 PrimitiveProvider, 

67) 

68from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key 

69from hypothesis.internal.escalation import InterestingOrigin 

70from hypothesis.internal.healthcheck import fail_health_check 

71from hypothesis.internal.observability import Observation, with_observability_callback 

72from hypothesis.reporting import base_report, report 

73 

74# In most cases, the following constants are all Final. However, we do allow users 

75# to monkeypatch all of these variables, which means we cannot annotate them as 

76# Final or mypyc will inline them and render monkeypatching useless. 

77 

78#: The maximum number of times the shrinker will reduce the complexity of a failing 

79#: input before giving up. This avoids falling down a trap of exponential (or worse) 

80#: complexity, where the shrinker appears to be making progress but will take a 

81#: substantially long time to finish completely. 

82MAX_SHRINKS: int = 500 

83 

84# If the shrinking phase takes more than five minutes, abort it early and print 

85# a warning. Many CI systems will kill a build after around ten minutes with 

86# no output, and appearing to hang isn't great for interactive use either - 

87# showing partially-shrunk examples is better than quitting with no examples! 

88# (but make it monkeypatchable, for the rare users who need to keep on shrinking) 

89 

90#: The maximum total time in seconds that the shrinker will try to shrink a failure 

91#: for before giving up. This is across all shrinks for the same failure, so even 

92#: if the shrinker successfully reduces the complexity of a single failure several 

93#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken. 

94MAX_SHRINKING_SECONDS: int = 300 

95 

96#: The maximum amount of entropy a single test case can use before giving up 

97#: while making random choices during input generation. 

98#: 

99#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you 

100#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly 

101#: increase the amount of entropy a test case can use during generation. 

102BUFFER_SIZE: int = 8 * 1024 

103CACHE_SIZE: int = 10000 

104MIN_TEST_CALLS: int = 10 

105 

106 

107def shortlex(s): 

108 return (len(s), s) 

109 

110 

111@dataclass 

112class HealthCheckState: 

113 valid_examples: int = field(default=0) 

114 invalid_examples: int = field(default=0) 

115 overrun_examples: int = field(default=0) 

116 draw_times: defaultdict[str, list[float]] = field( 

117 default_factory=lambda: defaultdict(list) 

118 ) 

119 

120 @property 

121 def total_draw_time(self) -> float: 

122 return math.fsum(sum(self.draw_times.values(), start=[])) 

123 

124 def timing_report(self) -> str: 

125 """Return a terminal report describing what was slow.""" 

126 if not self.draw_times: 

127 return "" 

128 width = max( 

129 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times 

130 ) 

131 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"] 

132 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1])) 

133 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch 

134 # If we have very many unique keys, which can happen due to interactive 

135 # draws with computed labels, we'll skip uninformative rows. 

136 if ( 

137 5 <= i < (len(self.draw_times) - 2) 

138 and math.fsum(times) * 20 < self.total_draw_time 

139 ): 

140 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)") 

141 break 

142 # Compute the row to report, omitting times <1ms to focus on slow draws 

143 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4] 

144 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",") 

145 arg = argname.removeprefix("generate:").removesuffix(": ") 

146 out.append( 

147 f" {arg:^{width}} | {len(times):>4} | " 

148 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}" 

149 ) 

150 return "\n".join(out) 

151 

152 

153class ExitReason(Enum): 

154 max_examples = "settings.max_examples={s.max_examples}" 

155 max_iterations = ( 

156 "settings.max_examples={s.max_examples}, " 

157 "but < 10% of examples satisfied assumptions" 

158 ) 

159 max_shrinks = f"shrunk example {MAX_SHRINKS} times" 

160 finished = "nothing left to do" 

161 flaky = "test was flaky" 

162 very_slow_shrinking = "shrinking was very slow" 

163 

164 def describe(self, settings: Settings) -> str: 

165 return self.value.format(s=settings) 

166 

167 

168class RunIsComplete(Exception): 

169 pass 

170 

171 

172def _get_provider(backend: str) -> Union[type, PrimitiveProvider]: 

173 module_name, class_name = AVAILABLE_PROVIDERS[backend].rsplit(".", 1) 

174 provider_cls = getattr(importlib.import_module(module_name), class_name) 

175 if provider_cls.lifetime == "test_function": 

176 return provider_cls(None) 

177 elif provider_cls.lifetime == "test_case": 

178 return provider_cls 

179 else: 

180 raise InvalidArgument( 

181 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. " 

182 "Expected one of 'test_function', 'test_case'." 

183 ) 

184 

185 

186class CallStats(TypedDict): 

187 status: str 

188 runtime: float 

189 drawtime: float 

190 gctime: float 

191 events: list[str] 

192 

193 

194PhaseStatistics = TypedDict( 

195 "PhaseStatistics", 

196 { 

197 "duration-seconds": float, 

198 "test-cases": list[CallStats], 

199 "distinct-failures": int, 

200 "shrinks-successful": int, 

201 }, 

202) 

203StatisticsDict = TypedDict( 

204 "StatisticsDict", 

205 { 

206 "generate-phase": NotRequired[PhaseStatistics], 

207 "reuse-phase": NotRequired[PhaseStatistics], 

208 "shrink-phase": NotRequired[PhaseStatistics], 

209 "stopped-because": NotRequired[str], 

210 "targets": NotRequired[dict[str, float]], 

211 "nodeid": NotRequired[str], 

212 }, 

213) 

214 

215 

216def choice_count(choices: Sequence[Union[ChoiceT, ChoiceTemplate]]) -> Optional[int]: 

217 count = 0 

218 for choice in choices: 

219 if isinstance(choice, ChoiceTemplate): 

220 if choice.count is None: 

221 return None 

222 count += choice.count 

223 else: 

224 count += 1 

225 return count 

226 

227 

228class DiscardObserver(DataObserver): 

229 @override 

230 def kill_branch(self) -> NoReturn: 

231 raise ContainsDiscard 

232 

233 

234def realize_choices(data: ConjectureData, *, for_failure: bool) -> None: 

235 # backwards-compatibility with backends without for_failure, can remove 

236 # in a few months 

237 kwargs = {} 

238 if for_failure: 

239 if "for_failure" in inspect.signature(data.provider.realize).parameters: 

240 kwargs["for_failure"] = True 

241 else: 

242 note_deprecation( 

243 f"{type(data.provider).__qualname__}.realize does not have the " 

244 "for_failure parameter. This will be an error in future versions " 

245 "of Hypothesis. (If you installed this backend from a separate " 

246 "package, upgrading that package may help).", 

247 has_codemod=False, 

248 since="2025-05-07", 

249 ) 

250 

251 for node in data.nodes: 

252 value = data.provider.realize(node.value, **kwargs) 

253 expected_type = { 

254 "string": str, 

255 "float": float, 

256 "integer": int, 

257 "boolean": bool, 

258 "bytes": bytes, 

259 }[node.type] 

260 if type(value) is not expected_type: 

261 raise HypothesisException( 

262 f"expected {expected_type} from " 

263 f"{data.provider.realize.__qualname__}, got {type(value)}" 

264 ) 

265 

266 constraints = cast( 

267 ChoiceConstraintsT, 

268 { 

269 k: data.provider.realize(v, **kwargs) 

270 for k, v in node.constraints.items() 

271 }, 

272 ) 

273 node.value = value 

274 node.constraints = constraints 

275 

276 

277class ConjectureRunner: 

278 def __init__( 

279 self, 

280 test_function: Callable[[ConjectureData], None], 

281 *, 

282 settings: Optional[Settings] = None, 

283 random: Optional[Random] = None, 

284 database_key: Optional[bytes] = None, 

285 ignore_limits: bool = False, 

286 thread_overlap: Optional[dict[int, bool]] = None, 

287 ) -> None: 

288 self._test_function: Callable[[ConjectureData], None] = test_function 

289 self.settings: Settings = settings or Settings() 

290 self.shrinks: int = 0 

291 self.finish_shrinking_deadline: Optional[float] = None 

292 self.call_count: int = 0 

293 self.misaligned_count: int = 0 

294 self.valid_examples: int = 0 

295 self.invalid_examples: int = 0 

296 self.overrun_examples: int = 0 

297 self.random: Random = random or Random(getrandbits(128)) 

298 self.database_key: Optional[bytes] = database_key 

299 self.ignore_limits: bool = ignore_limits 

300 self.thread_overlap = {} if thread_overlap is None else thread_overlap 

301 

302 # Global dict of per-phase statistics, and a list of per-call stats 

303 # which transfer to the global dict at the end of each phase. 

304 self._current_phase: str = "(not a phase)" 

305 self.statistics: StatisticsDict = {} 

306 self.stats_per_test_case: list[CallStats] = [] 

307 

308 self.interesting_examples: dict[InterestingOrigin, ConjectureResult] = {} 

309 # We use call_count because there may be few possible valid_examples. 

310 self.first_bug_found_at: Optional[int] = None 

311 self.last_bug_found_at: Optional[int] = None 

312 

313 self.shrunk_examples: set[InterestingOrigin] = set() 

314 self.health_check_state: Optional[HealthCheckState] = None 

315 self.tree: DataTree = DataTree() 

316 self.provider: Union[type, PrimitiveProvider] = _get_provider( 

317 self.settings.backend 

318 ) 

319 

320 self.best_observed_targets: defaultdict[str, float] = defaultdict( 

321 lambda: NO_SCORE 

322 ) 

323 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {} 

324 

325 # We keep the pareto front in the example database if we have one. This 

326 # is only marginally useful at present, but speeds up local development 

327 # because it means that large targets will be quickly surfaced in your 

328 # testing. 

329 self.pareto_front: Optional[ParetoFront] = None 

330 if self.database_key is not None and self.settings.database is not None: 

331 self.pareto_front = ParetoFront(self.random) 

332 self.pareto_front.on_evict(self.on_pareto_evict) 

333 

334 # We want to be able to get the ConjectureData object that results 

335 # from running a choice sequence without recalculating, especially during 

336 # shrinking where we need to know about the structure of the 

337 # executed test case. 

338 self.__data_cache = LRUReusedCache[ 

339 tuple[ChoiceKeyT, ...], Union[ConjectureResult, _Overrun] 

340 ](CACHE_SIZE) 

341 

342 self.reused_previously_shrunk_test_case: bool = False 

343 

344 self.__pending_call_explanation: Optional[str] = None 

345 self._backend_found_failure: bool = False 

346 self._backend_exceeded_deadline: bool = False 

347 self._switch_to_hypothesis_provider: bool = False 

348 

349 self.__failed_realize_count: int = 0 

350 # note unsound verification by alt backends 

351 self._verified_by: Optional[str] = None 

352 

353 @contextmanager 

354 def _with_switch_to_hypothesis_provider( 

355 self, value: bool 

356 ) -> Generator[None, None, None]: 

357 previous = self._switch_to_hypothesis_provider 

358 try: 

359 self._switch_to_hypothesis_provider = value 

360 yield 

361 finally: 

362 self._switch_to_hypothesis_provider = previous 

363 

364 @property 

365 def using_hypothesis_backend(self) -> bool: 

366 return ( 

367 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider 

368 ) 

369 

370 def explain_next_call_as(self, explanation: str) -> None: 

371 self.__pending_call_explanation = explanation 

372 

373 def clear_call_explanation(self) -> None: 

374 self.__pending_call_explanation = None 

375 

376 @contextmanager 

377 def _log_phase_statistics( 

378 self, phase: Literal["reuse", "generate", "shrink"] 

379 ) -> Generator[None, None, None]: 

380 self.stats_per_test_case.clear() 

381 start_time = time.perf_counter() 

382 try: 

383 self._current_phase = phase 

384 yield 

385 finally: 

386 self.statistics[phase + "-phase"] = { # type: ignore 

387 "duration-seconds": time.perf_counter() - start_time, 

388 "test-cases": list(self.stats_per_test_case), 

389 "distinct-failures": len(self.interesting_examples), 

390 "shrinks-successful": self.shrinks, 

391 } 

392 

393 @property 

394 def should_optimise(self) -> bool: 

395 return Phase.target in self.settings.phases 

396 

397 def __tree_is_exhausted(self) -> bool: 

398 return self.tree.is_exhausted and self.using_hypothesis_backend 

399 

400 def __stoppable_test_function(self, data: ConjectureData) -> None: 

401 """Run ``self._test_function``, but convert a ``StopTest`` exception 

402 into a normal return and avoid raising anything flaky for RecursionErrors. 

403 """ 

404 # We ensure that the test has this much stack space remaining, no 

405 # matter the size of the stack when called, to de-flake RecursionErrors 

406 # (#2494, #3671). Note, this covers the data generation part of the test; 

407 # the actual test execution is additionally protected at the call site 

408 # in hypothesis.core.execute_once. 

409 with ensure_free_stackframes(): 

410 try: 

411 self._test_function(data) 

412 except StopTest as e: 

413 if e.testcounter == data.testcounter: 

414 # This StopTest has successfully stopped its test, and can now 

415 # be discarded. 

416 pass 

417 else: 

418 # This StopTest was raised by a different ConjectureData. We 

419 # need to re-raise it so that it will eventually reach the 

420 # correct engine. 

421 raise 

422 

423 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]: 

424 return choices_key(choices) 

425 

426 def _cache(self, data: ConjectureData) -> None: 

427 result = data.as_result() 

428 key = self._cache_key(data.choices) 

429 self.__data_cache[key] = result 

430 

431 def cached_test_function( 

432 self, 

433 choices: Sequence[Union[ChoiceT, ChoiceTemplate]], 

434 *, 

435 error_on_discard: bool = False, 

436 extend: Union[int, Literal["full"]] = 0, 

437 ) -> Union[ConjectureResult, _Overrun]: 

438 """ 

439 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard`` 

440 in preference to running the actual test function. This is to allow us 

441 to skip test cases we expect to be redundant in some cases. Note that 

442 it may be the case that we don't raise ``ContainsDiscard`` even if the 

443 result has discards if we cannot determine from previous runs whether 

444 it will have a discard. 

445 """ 

446 # node templates represent a not-yet-filled hole and therefore cannot 

447 # be cached or retrieved from the cache. 

448 if not any(isinstance(choice, ChoiceTemplate) for choice in choices): 

449 # this type cast is validated by the isinstance check above (ie, there 

450 # are no ChoiceTemplate elements). 

451 choices = cast(Sequence[ChoiceT], choices) 

452 key = self._cache_key(choices) 

453 try: 

454 cached = self.__data_cache[key] 

455 # if we have a cached overrun for this key, but we're allowing extensions 

456 # of the nodes, it could in fact run to a valid data if we try. 

457 if extend == 0 or cached.status is not Status.OVERRUN: 

458 return cached 

459 except KeyError: 

460 pass 

461 

462 if extend == "full": 

463 max_length = None 

464 elif (count := choice_count(choices)) is None: 

465 max_length = None 

466 else: 

467 max_length = count + extend 

468 

469 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver. 

470 # The reason is we don't expect simulate_test_function to explore new choices 

471 # and write back to the tree, so we don't want the overhead of the 

472 # TreeRecordingObserver tracking those calls. 

473 trial_observer: Optional[DataObserver] = DataObserver() 

474 if error_on_discard: 

475 trial_observer = DiscardObserver() 

476 

477 try: 

478 trial_data = self.new_conjecture_data( 

479 choices, observer=trial_observer, max_choices=max_length 

480 ) 

481 self.tree.simulate_test_function(trial_data) 

482 except PreviouslyUnseenBehaviour: 

483 pass 

484 else: 

485 trial_data.freeze() 

486 key = self._cache_key(trial_data.choices) 

487 if trial_data.status > Status.OVERRUN: 

488 try: 

489 return self.__data_cache[key] 

490 except KeyError: 

491 pass 

492 else: 

493 # if we simulated to an overrun, then we our result is certainly 

494 # an overrun; no need to consult the cache. (and we store this result 

495 # for simulation-less lookup later). 

496 self.__data_cache[key] = Overrun 

497 return Overrun 

498 try: 

499 return self.__data_cache[key] 

500 except KeyError: 

501 pass 

502 

503 data = self.new_conjecture_data(choices, max_choices=max_length) 

504 # note that calling test_function caches `data` for us. 

505 self.test_function(data) 

506 return data.as_result() 

507 

508 def test_function(self, data: ConjectureData) -> None: 

509 if self.__pending_call_explanation is not None: 

510 self.debug(self.__pending_call_explanation) 

511 self.__pending_call_explanation = None 

512 

513 self.call_count += 1 

514 interrupted = False 

515 

516 try: 

517 self.__stoppable_test_function(data) 

518 except KeyboardInterrupt: 

519 interrupted = True 

520 raise 

521 except BackendCannotProceed as exc: 

522 if exc.scope in ("verified", "exhausted"): 

523 self._switch_to_hypothesis_provider = True 

524 if exc.scope == "verified": 

525 self._verified_by = self.settings.backend 

526 elif exc.scope == "discard_test_case": 

527 self.__failed_realize_count += 1 

528 if ( 

529 self.__failed_realize_count > 10 

530 and (self.__failed_realize_count / self.call_count) > 0.2 

531 ): 

532 self._switch_to_hypothesis_provider = True 

533 

534 # treat all BackendCannotProceed exceptions as invalid. This isn't 

535 # great; "verified" should really be counted as self.valid_examples += 1. 

536 # But we check self.valid_examples == 0 to determine whether to raise 

537 # Unsatisfiable, and that would throw this check off. 

538 self.invalid_examples += 1 

539 

540 # skip the post-test-case tracking; we're pretending this never happened 

541 interrupted = True 

542 data.cannot_proceed_scope = exc.scope 

543 data.freeze() 

544 return 

545 except BaseException: 

546 data.freeze() 

547 if self.settings.backend != "hypothesis": 

548 realize_choices(data, for_failure=True) 

549 self.save_choices(data.choices) 

550 raise 

551 finally: 

552 # No branch, because if we're interrupted we always raise 

553 # the KeyboardInterrupt, never continue to the code below. 

554 if not interrupted: # pragma: no branch 

555 assert data.cannot_proceed_scope is None 

556 data.freeze() 

557 call_stats: CallStats = { 

558 "status": data.status.name.lower(), 

559 "runtime": data.finish_time - data.start_time, 

560 "drawtime": math.fsum(data.draw_times.values()), 

561 "gctime": data.gc_finish_time - data.gc_start_time, 

562 "events": sorted( 

563 k if v == "" else f"{k}: {v}" for k, v in data.events.items() 

564 ), 

565 } 

566 self.stats_per_test_case.append(call_stats) 

567 if self.settings.backend != "hypothesis": 

568 realize_choices(data, for_failure=data.status is Status.INTERESTING) 

569 

570 self._cache(data) 

571 if data.misaligned_at is not None: # pragma: no branch # coverage bug? 

572 self.misaligned_count += 1 

573 

574 self.debug_data(data) 

575 

576 if ( 

577 data.target_observations 

578 and self.pareto_front is not None 

579 and self.pareto_front.add(data.as_result()) 

580 ): 

581 self.save_choices(data.choices, sub_key=b"pareto") 

582 

583 if data.status >= Status.VALID: 

584 for k, v in data.target_observations.items(): 

585 self.best_observed_targets[k] = max(self.best_observed_targets[k], v) 

586 

587 if k not in self.best_examples_of_observed_targets: 

588 data_as_result = data.as_result() 

589 assert not isinstance(data_as_result, _Overrun) 

590 self.best_examples_of_observed_targets[k] = data_as_result 

591 continue 

592 

593 existing_example = self.best_examples_of_observed_targets[k] 

594 existing_score = existing_example.target_observations[k] 

595 

596 if v < existing_score: 

597 continue 

598 

599 if v > existing_score or sort_key(data.nodes) < sort_key( 

600 existing_example.nodes 

601 ): 

602 data_as_result = data.as_result() 

603 assert not isinstance(data_as_result, _Overrun) 

604 self.best_examples_of_observed_targets[k] = data_as_result 

605 

606 if data.status is Status.VALID: 

607 self.valid_examples += 1 

608 if data.status is Status.INVALID: 

609 self.invalid_examples += 1 

610 if data.status is Status.OVERRUN: 

611 self.overrun_examples += 1 

612 

613 if data.status == Status.INTERESTING: 

614 if not self.using_hypothesis_backend: 

615 # replay this failure on the hypothesis backend to ensure it still 

616 # finds a failure. otherwise, it is flaky. 

617 initial_exception = data.expected_exception 

618 data = ConjectureData.for_choices(data.choices) 

619 # we've already going to use the hypothesis provider for this 

620 # data, so the verb "switch" is a bit misleading here. We're really 

621 # setting this to inform our on_observation logic that the observation 

622 # generated here was from a hypothesis backend, and shouldn't be 

623 # sent to the on_observation of any alternative backend. 

624 with self._with_switch_to_hypothesis_provider(True): 

625 self.__stoppable_test_function(data) 

626 data.freeze() 

627 # TODO: Should same-origin also be checked? (discussion in 

628 # https://github.com/HypothesisWorks/hypothesis/pull/4470#discussion_r2217055487) 

629 if data.status != Status.INTERESTING: 

630 desc_new_status = { 

631 data.status.VALID: "passed", 

632 data.status.INVALID: "failed filters", 

633 data.status.OVERRUN: "overran", 

634 }[data.status] 

635 raise FlakyBackendFailure( 

636 f"Inconsistent results from replaying a failing test case! " 

637 f"Raised {type(initial_exception).__name__} on " 

638 f"backend={self.settings.backend!r}, but " 

639 f"{desc_new_status} under backend='hypothesis'.", 

640 [initial_exception], 

641 ) 

642 

643 self._cache(data) 

644 

645 assert data.interesting_origin is not None 

646 key = data.interesting_origin 

647 changed = False 

648 try: 

649 existing = self.interesting_examples[key] 

650 except KeyError: 

651 changed = True 

652 self.last_bug_found_at = self.call_count 

653 if self.first_bug_found_at is None: 

654 self.first_bug_found_at = self.call_count 

655 else: 

656 if sort_key(data.nodes) < sort_key(existing.nodes): 

657 self.shrinks += 1 

658 self.downgrade_choices(existing.choices) 

659 self.__data_cache.unpin(self._cache_key(existing.choices)) 

660 changed = True 

661 

662 if changed: 

663 self.save_choices(data.choices) 

664 self.interesting_examples[key] = data.as_result() # type: ignore 

665 if not self.using_hypothesis_backend: 

666 self._backend_found_failure = True 

667 self.__data_cache.pin(self._cache_key(data.choices), data.as_result()) 

668 self.shrunk_examples.discard(key) 

669 

670 if self.shrinks >= MAX_SHRINKS: 

671 self.exit_with(ExitReason.max_shrinks) 

672 

673 if ( 

674 not self.ignore_limits 

675 and self.finish_shrinking_deadline is not None 

676 and self.finish_shrinking_deadline < time.perf_counter() 

677 ): 

678 # See https://github.com/HypothesisWorks/hypothesis/issues/2340 

679 report( 

680 "WARNING: Hypothesis has spent more than five minutes working to shrink" 

681 " a failing example, and stopped because it is making very slow" 

682 " progress. When you re-run your tests, shrinking will resume and may" 

683 " take this long before aborting again.\nPLEASE REPORT THIS if you can" 

684 " provide a reproducing example, so that we can improve shrinking" 

685 " performance for everyone." 

686 ) 

687 self.exit_with(ExitReason.very_slow_shrinking) 

688 

689 if not self.interesting_examples: 

690 # Note that this logic is reproduced to end the generation phase when 

691 # we have interesting examples. Update that too if you change this! 

692 # (The doubled implementation is because here we exit the engine entirely, 

693 # while in the other case below we just want to move on to shrinking.) 

694 if self.valid_examples >= self.settings.max_examples: 

695 self.exit_with(ExitReason.max_examples) 

696 if self.call_count >= max( 

697 self.settings.max_examples * 10, 

698 # We have a high-ish default max iterations, so that tests 

699 # don't become flaky when max_examples is too low. 

700 1000, 

701 ): 

702 self.exit_with(ExitReason.max_iterations) 

703 

704 if self.__tree_is_exhausted(): 

705 self.exit_with(ExitReason.finished) 

706 

707 self.record_for_health_check(data) 

708 

709 def on_pareto_evict(self, data: ConjectureResult) -> None: 

710 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices)) 

711 

712 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]: 

713 """Uses the tree to proactively generate a starting choice sequence 

714 that we haven't explored yet for this test. 

715 

716 When this method is called, we assume that there must be at 

717 least one novel prefix left to find. If there were not, then the 

718 test run should have already stopped due to tree exhaustion. 

719 """ 

720 return self.tree.generate_novel_prefix(self.random) 

721 

722 def record_for_health_check(self, data: ConjectureData) -> None: 

723 # Once we've actually found a bug, there's no point in trying to run 

724 # health checks - they'll just mask the actually important information. 

725 if data.status == Status.INTERESTING: 

726 self.health_check_state = None 

727 

728 state = self.health_check_state 

729 

730 if state is None: 

731 return 

732 

733 for k, v in data.draw_times.items(): 

734 state.draw_times[k].append(v) 

735 

736 if data.status == Status.VALID: 

737 state.valid_examples += 1 

738 elif data.status == Status.INVALID: 

739 state.invalid_examples += 1 

740 else: 

741 assert data.status == Status.OVERRUN 

742 state.overrun_examples += 1 

743 

744 max_valid_draws = 10 

745 max_invalid_draws = 50 

746 max_overrun_draws = 20 

747 

748 assert state.valid_examples <= max_valid_draws 

749 

750 if state.valid_examples == max_valid_draws: 

751 self.health_check_state = None 

752 return 

753 

754 if state.overrun_examples == max_overrun_draws: 

755 fail_health_check( 

756 self.settings, 

757 "Generated inputs routinely consumed more than the maximum " 

758 f"allowed entropy: {state.valid_examples} inputs were generated " 

759 f"successfully, while {state.overrun_examples} inputs exceeded the " 

760 f"maximum allowed entropy during generation." 

761 "\n\n" 

762 f"Testing with inputs this large tends to be slow, and to produce " 

763 "failures that are both difficult to shrink and difficult to understand. " 

764 "Try decreasing the amount of data generated, for example by " 

765 "decreasing the minimum size of collection strategies like " 

766 "st.lists()." 

767 "\n\n" 

768 "If you expect the average size of your input to be this large, " 

769 "you can disable this health check with " 

770 "@settings(suppress_health_check=[HealthCheck.data_too_large]). " 

771 "See " 

772 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

773 "for details.", 

774 HealthCheck.data_too_large, 

775 ) 

776 if state.invalid_examples == max_invalid_draws: 

777 fail_health_check( 

778 self.settings, 

779 "It looks like this test is filtering out a lot of inputs. " 

780 f"{state.valid_examples} inputs were generated successfully, " 

781 f"while {state.invalid_examples} inputs were filtered out. " 

782 "\n\n" 

783 "An input might be filtered out by calls to assume(), " 

784 "strategy.filter(...), or occasionally by Hypothesis internals." 

785 "\n\n" 

786 "Applying this much filtering makes input generation slow, since " 

787 "Hypothesis must discard inputs which are filtered out and try " 

788 "generating it again. It is also possible that applying this much " 

789 "filtering will distort the domain and/or distribution of the test, " 

790 "leaving your testing less rigorous than expected." 

791 "\n\n" 

792 "If you expect this many inputs to be filtered out during generation, " 

793 "you can disable this health check with " 

794 "@settings(suppress_health_check=[HealthCheck.filter_too_much]). See " 

795 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

796 "for details.", 

797 HealthCheck.filter_too_much, 

798 ) 

799 

800 # Allow at least the greater of one second or 5x the deadline. If deadline 

801 # is None, allow 30s - the user can disable the healthcheck too if desired. 

802 draw_time = state.total_draw_time 

803 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6)) 

804 if ( 

805 draw_time > max(1.0, draw_time_limit.total_seconds()) 

806 # we disable HealthCheck.too_slow under concurrent threads, since 

807 # cpython may switch away from a thread for arbitrarily long. 

808 and not self.thread_overlap.get(threading.get_ident(), False) 

809 ): 

810 extra_str = [] 

811 if state.invalid_examples: 

812 extra_str.append(f"{state.invalid_examples} invalid inputs") 

813 if state.overrun_examples: 

814 extra_str.append( 

815 f"{state.overrun_examples} inputs which exceeded the " 

816 "maximum allowed entropy" 

817 ) 

818 extra_str = ", and ".join(extra_str) 

819 extra_str = f" ({extra_str})" if extra_str else "" 

820 

821 fail_health_check( 

822 self.settings, 

823 "Input generation is slow: Hypothesis only generated " 

824 f"{state.valid_examples} valid inputs after {draw_time:.2f} " 

825 f"seconds{extra_str}." 

826 "\n" + state.timing_report() + "\n\n" 

827 "This could be for a few reasons:" 

828 "\n" 

829 "1. This strategy could be generating too much data per input. " 

830 "Try decreasing the amount of data generated, for example by " 

831 "decreasing the minimum size of collection strategies like " 

832 "st.lists()." 

833 "\n" 

834 "2. Some other expensive computation could be running during input " 

835 "generation. For example, " 

836 "if @st.composite or st.data() is interspersed with an expensive " 

837 "computation, HealthCheck.too_slow is likely to trigger. If this " 

838 "computation is unrelated to input generation, move it elsewhere. " 

839 "Otherwise, try making it more efficient, or disable this health " 

840 "check if that is not possible." 

841 "\n\n" 

842 "If you expect input generation to take this long, you can disable " 

843 "this health check with " 

844 "@settings(suppress_health_check=[HealthCheck.too_slow]). See " 

845 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

846 "for details.", 

847 HealthCheck.too_slow, 

848 ) 

849 

850 def save_choices( 

851 self, choices: Sequence[ChoiceT], sub_key: Optional[bytes] = None 

852 ) -> None: 

853 if self.settings.database is not None: 

854 key = self.sub_key(sub_key) 

855 if key is None: 

856 return 

857 self.settings.database.save(key, choices_to_bytes(choices)) 

858 

859 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None: 

860 buffer = choices_to_bytes(choices) 

861 if self.settings.database is not None and self.database_key is not None: 

862 self.settings.database.move(self.database_key, self.secondary_key, buffer) 

863 

864 def sub_key(self, sub_key: Optional[bytes]) -> Optional[bytes]: 

865 if self.database_key is None: 

866 return None 

867 if sub_key is None: 

868 return self.database_key 

869 return b".".join((self.database_key, sub_key)) 

870 

871 @property 

872 def secondary_key(self) -> Optional[bytes]: 

873 return self.sub_key(b"secondary") 

874 

875 @property 

876 def pareto_key(self) -> Optional[bytes]: 

877 return self.sub_key(b"pareto") 

878 

879 def debug(self, message: str) -> None: 

880 if self.settings.verbosity >= Verbosity.debug: 

881 base_report(message) 

882 

883 @property 

884 def report_debug_info(self) -> bool: 

885 return self.settings.verbosity >= Verbosity.debug 

886 

887 def debug_data(self, data: Union[ConjectureData, ConjectureResult]) -> None: 

888 if not self.report_debug_info: 

889 return 

890 

891 status = repr(data.status) 

892 if data.status == Status.INTERESTING: 

893 status = f"{status} ({data.interesting_origin!r})" 

894 

895 self.debug( 

896 f"{len(data.choices)} choices {data.choices} -> {status}" 

897 f"{', ' + data.output if data.output else ''}" 

898 ) 

899 

900 def observe_for_provider(self) -> AbstractContextManager: 

901 def on_observation(observation: Observation) -> None: 

902 assert observation.type == "test_case" 

903 # because lifetime == "test_function" 

904 assert isinstance(self.provider, PrimitiveProvider) 

905 # only fire if we actually used that provider to generate this observation 

906 if not self._switch_to_hypothesis_provider: 

907 self.provider.on_observation(observation) 

908 

909 if ( 

910 self.settings.backend != "hypothesis" 

911 # only for lifetime = "test_function" providers (guaranteed 

912 # by this isinstance check) 

913 and isinstance(self.provider, PrimitiveProvider) 

914 # and the provider opted-in to observations 

915 and self.provider.add_observability_callback 

916 ): 

917 return with_observability_callback(on_observation) 

918 return nullcontext() 

919 

920 def run(self) -> None: 

921 with local_settings(self.settings): 

922 # NOTE: For compatibility with Python 3.9's LL(1) 

923 # parser, this is written as a nested with-statement, 

924 # instead of a compound one. 

925 with self.observe_for_provider(): 

926 try: 

927 self._run() 

928 except RunIsComplete: 

929 pass 

930 for v in self.interesting_examples.values(): 

931 self.debug_data(v) 

932 self.debug( 

933 "Run complete after %d examples (%d valid) and %d shrinks" 

934 % (self.call_count, self.valid_examples, self.shrinks) 

935 ) 

936 

937 @property 

938 def database(self) -> Optional[ExampleDatabase]: 

939 if self.database_key is None: 

940 return None 

941 return self.settings.database 

942 

943 def has_existing_examples(self) -> bool: 

944 return self.database is not None and Phase.reuse in self.settings.phases 

945 

946 def reuse_existing_examples(self) -> None: 

947 """If appropriate (we have a database and have been told to use it), 

948 try to reload existing examples from the database. 

949 

950 If there are a lot we don't try all of them. We always try the 

951 smallest example in the database (which is guaranteed to be the 

952 last failure) and the largest (which is usually the seed example 

953 which the last failure came from but we don't enforce that). We 

954 then take a random sampling of the remainder and try those. Any 

955 examples that are no longer interesting are cleared out. 

956 """ 

957 if self.has_existing_examples(): 

958 self.debug("Reusing examples from database") 

959 # We have to do some careful juggling here. We have two database 

960 # corpora: The primary and secondary. The primary corpus is a 

961 # small set of minimized examples each of which has at one point 

962 # demonstrated a distinct bug. We want to retry all of these. 

963 

964 # We also have a secondary corpus of examples that have at some 

965 # point demonstrated interestingness (currently only ones that 

966 # were previously non-minimal examples of a bug, but this will 

967 # likely expand in future). These are a good source of potentially 

968 # interesting examples, but there are a lot of them, so we down 

969 # sample the secondary corpus to a more manageable size. 

970 

971 corpus = sorted( 

972 self.settings.database.fetch(self.database_key), key=shortlex 

973 ) 

974 factor = 0.1 if (Phase.generate in self.settings.phases) else 1 

975 desired_size = max(2, ceil(factor * self.settings.max_examples)) 

976 primary_corpus_size = len(corpus) 

977 

978 if len(corpus) < desired_size: 

979 extra_corpus = list(self.settings.database.fetch(self.secondary_key)) 

980 

981 shortfall = desired_size - len(corpus) 

982 

983 if len(extra_corpus) <= shortfall: 

984 extra = extra_corpus 

985 else: 

986 extra = self.random.sample(extra_corpus, shortfall) 

987 extra.sort(key=shortlex) 

988 corpus.extend(extra) 

989 

990 # We want a fast path where every primary entry in the database was 

991 # interesting. 

992 found_interesting_in_primary = False 

993 all_interesting_in_primary_were_exact = True 

994 

995 for i, existing in enumerate(corpus): 

996 if i >= primary_corpus_size and found_interesting_in_primary: 

997 break 

998 choices = choices_from_bytes(existing) 

999 if choices is None: 

1000 # clear out any keys which fail deserialization 

1001 self.settings.database.delete(self.database_key, existing) 

1002 continue 

1003 data = self.cached_test_function(choices, extend="full") 

1004 if data.status != Status.INTERESTING: 

1005 self.settings.database.delete(self.database_key, existing) 

1006 self.settings.database.delete(self.secondary_key, existing) 

1007 else: 

1008 if i < primary_corpus_size: 

1009 found_interesting_in_primary = True 

1010 assert not isinstance(data, _Overrun) 

1011 if choices_key(choices) != choices_key(data.choices): 

1012 all_interesting_in_primary_were_exact = False 

1013 if not self.settings.report_multiple_bugs: 

1014 break 

1015 if found_interesting_in_primary: 

1016 if all_interesting_in_primary_were_exact: 

1017 self.reused_previously_shrunk_test_case = True 

1018 

1019 # Because self.database is not None (because self.has_existing_examples()) 

1020 # and self.database_key is not None (because we fetched using it above), 

1021 # we can guarantee self.pareto_front is not None 

1022 assert self.pareto_front is not None 

1023 

1024 # If we've not found any interesting examples so far we try some of 

1025 # the pareto front from the last run. 

1026 if len(corpus) < desired_size and not self.interesting_examples: 

1027 desired_extra = desired_size - len(corpus) 

1028 pareto_corpus = list(self.settings.database.fetch(self.pareto_key)) 

1029 if len(pareto_corpus) > desired_extra: 

1030 pareto_corpus = self.random.sample(pareto_corpus, desired_extra) 

1031 pareto_corpus.sort(key=shortlex) 

1032 

1033 for existing in pareto_corpus: 

1034 choices = choices_from_bytes(existing) 

1035 if choices is None: 

1036 self.settings.database.delete(self.pareto_key, existing) 

1037 continue 

1038 data = self.cached_test_function(choices, extend="full") 

1039 if data not in self.pareto_front: 

1040 self.settings.database.delete(self.pareto_key, existing) 

1041 if data.status == Status.INTERESTING: 

1042 break 

1043 

1044 def exit_with(self, reason: ExitReason) -> None: 

1045 if self.ignore_limits: 

1046 return 

1047 self.statistics["stopped-because"] = reason.describe(self.settings) 

1048 if self.best_observed_targets: 

1049 self.statistics["targets"] = dict(self.best_observed_targets) 

1050 self.debug(f"exit_with({reason.name})") 

1051 self.exit_reason = reason 

1052 raise RunIsComplete 

1053 

1054 def should_generate_more(self) -> bool: 

1055 # End the generation phase where we would have ended it if no bugs had 

1056 # been found. This reproduces the exit logic in `self.test_function`, 

1057 # but with the important distinction that this clause will move on to 

1058 # the shrinking phase having found one or more bugs, while the other 

1059 # will exit having found zero bugs. 

1060 if self.valid_examples >= self.settings.max_examples or self.call_count >= max( 

1061 self.settings.max_examples * 10, 1000 

1062 ): # pragma: no cover 

1063 return False 

1064 

1065 # If we haven't found a bug, keep looking - if we hit any limits on 

1066 # the number of tests to run that will raise an exception and stop 

1067 # the run. 

1068 if not self.interesting_examples: 

1069 return True 

1070 # Users who disable shrinking probably want to exit as fast as possible. 

1071 # If we've found a bug and won't report more than one, stop looking. 

1072 elif ( 

1073 Phase.shrink not in self.settings.phases 

1074 or not self.settings.report_multiple_bugs 

1075 ): 

1076 return False 

1077 assert isinstance(self.first_bug_found_at, int) 

1078 assert isinstance(self.last_bug_found_at, int) 

1079 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count 

1080 # Otherwise, keep searching for between ten and 'a heuristic' calls. 

1081 # We cap 'calls after first bug' so errors are reported reasonably 

1082 # soon even for tests that are allowed to run for a very long time, 

1083 # or sooner if the latest half of our test effort has been fruitless. 

1084 return self.call_count < MIN_TEST_CALLS or self.call_count < min( 

1085 self.first_bug_found_at + 1000, self.last_bug_found_at * 2 

1086 ) 

1087 

1088 def generate_new_examples(self) -> None: 

1089 if Phase.generate not in self.settings.phases: 

1090 return 

1091 if self.interesting_examples: 

1092 # The example database has failing examples from a previous run, 

1093 # so we'd rather report that they're still failing ASAP than take 

1094 # the time to look for additional failures. 

1095 return 

1096 

1097 self.debug("Generating new examples") 

1098 

1099 assert self.should_generate_more() 

1100 self._switch_to_hypothesis_provider = True 

1101 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),)) 

1102 if zero_data.status > Status.OVERRUN: 

1103 assert isinstance(zero_data, ConjectureResult) 

1104 # if the crosshair backend cannot proceed, it does not (and cannot) 

1105 # realize the symbolic values, with the intent that Hypothesis will 

1106 # throw away this test case. We usually do, but if it's the zero data 

1107 # then we try to pin it here, which requires realizing the symbolics. 

1108 # 

1109 # We don't (yet) rely on the zero data being pinned, and so 

1110 # it's simply a very slight performance loss to simply not pin it 

1111 # if doing so would error. 

1112 if zero_data.cannot_proceed_scope is None: # pragma: no branch 

1113 self.__data_cache.pin( 

1114 self._cache_key(zero_data.choices), zero_data.as_result() 

1115 ) # Pin forever 

1116 

1117 if zero_data.status == Status.OVERRUN or ( 

1118 zero_data.status == Status.VALID 

1119 and isinstance(zero_data, ConjectureResult) 

1120 and zero_data.length * 2 > BUFFER_SIZE 

1121 ): 

1122 fail_health_check( 

1123 self.settings, 

1124 "The smallest natural input for this test is very " 

1125 "large. This makes it difficult for Hypothesis to generate " 

1126 "good inputs, especially when trying to shrink failing inputs." 

1127 "\n\n" 

1128 "Consider reducing the amount of data generated by the strategy. " 

1129 "Also consider introducing small alternative values for some " 

1130 "strategies. For example, could you " 

1131 "mark some arguments as optional by replacing `some_complex_strategy`" 

1132 "with `st.none() | some_complex_strategy`?" 

1133 "\n\n" 

1134 "If you are confident that the size of the smallest natural input " 

1135 "to your test cannot be reduced, you can suppress this health check " 

1136 "with @settings(suppress_health_check=[HealthCheck.large_base_example]). " 

1137 "See " 

1138 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

1139 "for details.", 

1140 HealthCheck.large_base_example, 

1141 ) 

1142 

1143 self.health_check_state = HealthCheckState() 

1144 

1145 # We attempt to use the size of the minimal generated test case starting 

1146 # from a given novel prefix as a guideline to generate smaller test 

1147 # cases for an initial period, by restriscting ourselves to test cases 

1148 # that are not much larger than it. 

1149 # 

1150 # Calculating the actual minimal generated test case is hard, so we 

1151 # take a best guess that zero extending a prefix produces the minimal 

1152 # test case starting with that prefix (this is true for our built in 

1153 # strategies). This is only a reasonable thing to do if the resulting 

1154 # test case is valid. If we regularly run into situations where it is 

1155 # not valid then this strategy is a waste of time, so we want to 

1156 # abandon it early. In order to do this we track how many times in a 

1157 # row it has failed to work, and abort small test case generation when 

1158 # it has failed too many times in a row. 

1159 consecutive_zero_extend_is_invalid = 0 

1160 

1161 # We control growth during initial example generation, for two 

1162 # reasons: 

1163 # 

1164 # * It gives us an opportunity to find small examples early, which 

1165 # gives us a fast path for easy to find bugs. 

1166 # * It avoids low probability events where we might end up 

1167 # generating very large examples during health checks, which 

1168 # on slower machines can trigger HealthCheck.too_slow. 

1169 # 

1170 # The heuristic we use is that we attempt to estimate the smallest 

1171 # extension of this prefix, and limit the size to no more than 

1172 # an order of magnitude larger than that. If we fail to estimate 

1173 # the size accurately, we skip over this prefix and try again. 

1174 # 

1175 # We need to tune the example size based on the initial prefix, 

1176 # because any fixed size might be too small, and any size based 

1177 # on the strategy in general can fall afoul of strategies that 

1178 # have very different sizes for different prefixes. 

1179 # 

1180 # We previously set a minimum value of 10 on small_example_cap, with the 

1181 # reasoning of avoiding flaky health checks. However, some users set a 

1182 # low max_examples for performance. A hard lower bound in this case biases 

1183 # the distribution towards small (and less powerful) examples. Flaky 

1184 # and loud health checks are better than silent performance degradation. 

1185 small_example_cap = min(self.settings.max_examples // 10, 50) 

1186 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10) 

1187 ran_optimisations = False 

1188 self._switch_to_hypothesis_provider = False 

1189 

1190 while self.should_generate_more(): 

1191 # we don't yet integrate DataTree with backends. Instead of generating 

1192 # a novel prefix, ask the backend for an input. 

1193 if not self.using_hypothesis_backend: 

1194 data = self.new_conjecture_data([]) 

1195 with suppress(BackendCannotProceed): 

1196 self.test_function(data) 

1197 continue 

1198 

1199 self._current_phase = "generate" 

1200 prefix = self.generate_novel_prefix() 

1201 if ( 

1202 self.valid_examples <= small_example_cap 

1203 and self.call_count <= 5 * small_example_cap 

1204 and not self.interesting_examples 

1205 and consecutive_zero_extend_is_invalid < 5 

1206 ): 

1207 minimal_example = self.cached_test_function( 

1208 prefix + (ChoiceTemplate("simplest", count=None),) 

1209 ) 

1210 

1211 if minimal_example.status < Status.VALID: 

1212 consecutive_zero_extend_is_invalid += 1 

1213 continue 

1214 # Because the Status code is greater than Status.VALID, it cannot be 

1215 # Status.OVERRUN, which guarantees that the minimal_example is a 

1216 # ConjectureResult object. 

1217 assert isinstance(minimal_example, ConjectureResult) 

1218 consecutive_zero_extend_is_invalid = 0 

1219 minimal_extension = len(minimal_example.choices) - len(prefix) 

1220 max_length = len(prefix) + minimal_extension * 5 

1221 

1222 # We could end up in a situation where even though the prefix was 

1223 # novel when we generated it, because we've now tried zero extending 

1224 # it not all possible continuations of it will be novel. In order to 

1225 # avoid making redundant test calls, we rerun it in simulation mode 

1226 # first. If this has a predictable result, then we don't bother 

1227 # running the test function for real here. If however we encounter 

1228 # some novel behaviour, we try again with the real test function, 

1229 # starting from the new novel prefix that has discovered. 

1230 trial_data = self.new_conjecture_data(prefix, max_choices=max_length) 

1231 try: 

1232 self.tree.simulate_test_function(trial_data) 

1233 continue 

1234 except PreviouslyUnseenBehaviour: 

1235 pass 

1236 

1237 # If the simulation entered part of the tree that has been killed, 

1238 # we don't want to run this. 

1239 assert isinstance(trial_data.observer, TreeRecordingObserver) 

1240 if trial_data.observer.killed: 

1241 continue 

1242 

1243 # We might have hit the cap on number of examples we should 

1244 # run when calculating the minimal example. 

1245 if not self.should_generate_more(): 

1246 break 

1247 

1248 prefix = trial_data.choices 

1249 else: 

1250 max_length = None 

1251 

1252 data = self.new_conjecture_data(prefix, max_choices=max_length) 

1253 self.test_function(data) 

1254 

1255 if ( 

1256 data.status is Status.OVERRUN 

1257 and max_length is not None 

1258 and "invalid because" not in data.events 

1259 ): 

1260 data.events["invalid because"] = ( 

1261 "reduced max size for early examples (avoids flaky health checks)" 

1262 ) 

1263 

1264 self.generate_mutations_from(data) 

1265 

1266 # Although the optimisations are logically a distinct phase, we 

1267 # actually normally run them as part of example generation. The 

1268 # reason for this is that we cannot guarantee that optimisation 

1269 # actually exhausts our budget: It might finish running and we 

1270 # discover that actually we still could run a bunch more test cases 

1271 # if we want. 

1272 if ( 

1273 self.valid_examples >= max(small_example_cap, optimise_at) 

1274 and not ran_optimisations 

1275 ): 

1276 ran_optimisations = True 

1277 self._current_phase = "target" 

1278 self.optimise_targets() 

1279 

1280 def generate_mutations_from( 

1281 self, data: Union[ConjectureData, ConjectureResult] 

1282 ) -> None: 

1283 # A thing that is often useful but rarely happens by accident is 

1284 # to generate the same value at multiple different points in the 

1285 # test case. 

1286 # 

1287 # Rather than make this the responsibility of individual strategies 

1288 # we implement a small mutator that just takes parts of the test 

1289 # case with the same label and tries replacing one of them with a 

1290 # copy of the other and tries running it. If we've made a good 

1291 # guess about what to put where, this will run a similar generated 

1292 # test case with more duplication. 

1293 if ( 

1294 # An OVERRUN doesn't have enough information about the test 

1295 # case to mutate, so we just skip those. 

1296 data.status >= Status.INVALID 

1297 # This has a tendency to trigger some weird edge cases during 

1298 # generation so we don't let it run until we're done with the 

1299 # health checks. 

1300 and self.health_check_state is None 

1301 ): 

1302 initial_calls = self.call_count 

1303 failed_mutations = 0 

1304 

1305 while ( 

1306 self.should_generate_more() 

1307 # We implement fairly conservative checks for how long we 

1308 # we should run mutation for, as it's generally not obvious 

1309 # how helpful it is for any given test case. 

1310 and self.call_count <= initial_calls + 5 

1311 and failed_mutations <= 5 

1312 ): 

1313 groups = data.spans.mutator_groups 

1314 if not groups: 

1315 break 

1316 

1317 group = self.random.choice(groups) 

1318 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2) 

1319 if start1 > start2: 

1320 (start1, end1), (start2, end2) = (start2, end2), (start1, end1) 

1321 

1322 if ( 

1323 start1 <= start2 <= end2 <= end1 

1324 ): # pragma: no cover # flaky on conjecture-cover tests 

1325 # One span entirely contains the other. The strategy is very 

1326 # likely some kind of tree. e.g. we might have 

1327 # 

1328 # ┌─────┐ 

1329 # ┌─────┤ a ├──────┐ 

1330 # │ └─────┘ │ 

1331 # ┌──┴──┐ ┌──┴──┐ 

1332 # ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1333 # │ └──┬──┘ │ │ └──┬──┘ │ 

1334 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1335 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1336 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1337 # 

1338 # where each node is drawn from the same strategy and so 

1339 # has the same span label. We might have selected the spans 

1340 # corresponding to the a and c nodes, which is the entire 

1341 # tree and the subtree of (and including) c respectively. 

1342 # 

1343 # There are two possible mutations we could apply in this case: 

1344 # 1. replace a with c (replace child with parent) 

1345 # 2. replace c with a (replace parent with child) 

1346 # 

1347 # (1) results in multiple partial copies of the 

1348 # parent: 

1349 # ┌─────┐ 

1350 # ┌─────┤ a ├────────────┐ 

1351 # │ └─────┘ │ 

1352 # ┌──┴──┐ ┌─┴───┐ 

1353 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐ 

1354 # │ └──┬──┘ │ │ └─────┘ │ 

1355 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐ 

1356 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1357 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │ 

1358 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1359 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1360 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1361 # 

1362 # While (2) results in truncating part of the parent: 

1363 # 

1364 # ┌─────┐ 

1365 # ┌──┤ c ├──┐ 

1366 # │ └──┬──┘ │ 

1367 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1368 # │ g │ │ h │ │ i │ 

1369 # └───┘ └───┘ └───┘ 

1370 # 

1371 # (1) is the same as Example IV.4. in Nautilus (NDSS '19) 

1372 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf), 

1373 # except we do not repeat the replacement additional times 

1374 # (the paper repeats it once for a total of two copies). 

1375 # 

1376 # We currently only apply mutation (1), and ignore mutation 

1377 # (2). The reason is that the attempt generated from (2) is 

1378 # always something that Hypothesis could easily have generated 

1379 # itself, by simply not making various choices. Whereas 

1380 # duplicating the exact value + structure of particular choices 

1381 # in (1) would have been hard for Hypothesis to generate by 

1382 # chance. 

1383 # 

1384 # TODO: an extension of this mutation might repeat (1) on 

1385 # a geometric distribution between 0 and ~10 times. We would 

1386 # need to find the corresponding span to recurse on in the new 

1387 # choices, probably just by using the choices index. 

1388 

1389 # case (1): duplicate the choices in start1:start2. 

1390 attempt = data.choices[:start2] + data.choices[start1:] 

1391 else: 

1392 (start, end) = self.random.choice([(start1, end1), (start2, end2)]) 

1393 replacement = data.choices[start:end] 

1394 # We attempt to replace both the examples with 

1395 # whichever choice we made. Note that this might end 

1396 # up messing up and getting the example boundaries 

1397 # wrong - labels matching are only a best guess as to 

1398 # whether the two are equivalent - but it doesn't 

1399 # really matter. It may not achieve the desired result, 

1400 # but it's still a perfectly acceptable choice sequence 

1401 # to try. 

1402 attempt = ( 

1403 data.choices[:start1] 

1404 + replacement 

1405 + data.choices[end1:start2] 

1406 + replacement 

1407 + data.choices[end2:] 

1408 ) 

1409 

1410 try: 

1411 new_data = self.cached_test_function( 

1412 attempt, 

1413 # We set error_on_discard so that we don't end up 

1414 # entering parts of the tree we consider redundant 

1415 # and not worth exploring. 

1416 error_on_discard=True, 

1417 ) 

1418 except ContainsDiscard: 

1419 failed_mutations += 1 

1420 continue 

1421 

1422 if new_data is Overrun: 

1423 failed_mutations += 1 # pragma: no cover # annoying case 

1424 else: 

1425 assert isinstance(new_data, ConjectureResult) 

1426 if ( 

1427 new_data.status >= data.status 

1428 and choices_key(data.choices) != choices_key(new_data.choices) 

1429 and all( 

1430 k in new_data.target_observations 

1431 and new_data.target_observations[k] >= v 

1432 for k, v in data.target_observations.items() 

1433 ) 

1434 ): 

1435 data = new_data 

1436 failed_mutations = 0 

1437 else: 

1438 failed_mutations += 1 

1439 

1440 def optimise_targets(self) -> None: 

1441 """If any target observations have been made, attempt to optimise them 

1442 all.""" 

1443 if not self.should_optimise: 

1444 return 

1445 from hypothesis.internal.conjecture.optimiser import Optimiser 

1446 

1447 # We want to avoid running the optimiser for too long in case we hit 

1448 # an unbounded target score. We start this off fairly conservatively 

1449 # in case interesting examples are easy to find and then ramp it up 

1450 # on an exponential schedule so we don't hamper the optimiser too much 

1451 # if it needs a long time to find good enough improvements. 

1452 max_improvements = 10 

1453 while True: 

1454 prev_calls = self.call_count 

1455 

1456 any_improvements = False 

1457 

1458 for target, data in list(self.best_examples_of_observed_targets.items()): 

1459 optimiser = Optimiser( 

1460 self, data, target, max_improvements=max_improvements 

1461 ) 

1462 optimiser.run() 

1463 if optimiser.improvements > 0: 

1464 any_improvements = True 

1465 

1466 if self.interesting_examples: 

1467 break 

1468 

1469 max_improvements *= 2 

1470 

1471 if any_improvements: 

1472 continue 

1473 

1474 if self.best_observed_targets: 

1475 self.pareto_optimise() 

1476 

1477 if prev_calls == self.call_count: 

1478 break 

1479 

1480 def pareto_optimise(self) -> None: 

1481 if self.pareto_front is not None: 

1482 ParetoOptimiser(self).run() 

1483 

1484 def _run(self) -> None: 

1485 # have to use the primitive provider to interpret database bits... 

1486 self._switch_to_hypothesis_provider = True 

1487 with self._log_phase_statistics("reuse"): 

1488 self.reuse_existing_examples() 

1489 # Fast path for development: If the database gave us interesting 

1490 # examples from the previously stored primary key, don't try 

1491 # shrinking it again as it's unlikely to work. 

1492 if self.reused_previously_shrunk_test_case: 

1493 self.exit_with(ExitReason.finished) 

1494 # ...but we should use the supplied provider when generating... 

1495 self._switch_to_hypothesis_provider = False 

1496 with self._log_phase_statistics("generate"): 

1497 self.generate_new_examples() 

1498 # We normally run the targeting phase mixed in with the generate phase, 

1499 # but if we've been asked to run it but not generation then we have to 

1500 # run it explicitly on its own here. 

1501 if Phase.generate not in self.settings.phases: 

1502 self._current_phase = "target" 

1503 self.optimise_targets() 

1504 # ...and back to the primitive provider when shrinking. 

1505 self._switch_to_hypothesis_provider = True 

1506 with self._log_phase_statistics("shrink"): 

1507 self.shrink_interesting_examples() 

1508 self.exit_with(ExitReason.finished) 

1509 

1510 def new_conjecture_data( 

1511 self, 

1512 prefix: Sequence[Union[ChoiceT, ChoiceTemplate]], 

1513 *, 

1514 observer: Optional[DataObserver] = None, 

1515 max_choices: Optional[int] = None, 

1516 ) -> ConjectureData: 

1517 provider = ( 

1518 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider 

1519 ) 

1520 observer = observer or self.tree.new_observer() 

1521 if not self.using_hypothesis_backend: 

1522 observer = DataObserver() 

1523 

1524 return ConjectureData( 

1525 prefix=prefix, 

1526 observer=observer, 

1527 provider=provider, 

1528 max_choices=max_choices, 

1529 random=self.random, 

1530 ) 

1531 

1532 def shrink_interesting_examples(self) -> None: 

1533 """If we've found interesting examples, try to replace each of them 

1534 with a minimal interesting example with the same interesting_origin. 

1535 

1536 We may find one or more examples with a new interesting_origin 

1537 during the shrink process. If so we shrink these too. 

1538 """ 

1539 if Phase.shrink not in self.settings.phases or not self.interesting_examples: 

1540 return 

1541 

1542 self.debug("Shrinking interesting examples") 

1543 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS 

1544 

1545 for prev_data in sorted( 

1546 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes) 

1547 ): 

1548 assert prev_data.status == Status.INTERESTING 

1549 data = self.new_conjecture_data(prev_data.choices) 

1550 self.test_function(data) 

1551 if data.status != Status.INTERESTING: 

1552 self.exit_with(ExitReason.flaky) 

1553 

1554 self.clear_secondary_key() 

1555 

1556 while len(self.shrunk_examples) < len(self.interesting_examples): 

1557 target, example = min( 

1558 ( 

1559 (k, v) 

1560 for k, v in self.interesting_examples.items() 

1561 if k not in self.shrunk_examples 

1562 ), 

1563 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))), 

1564 ) 

1565 self.debug(f"Shrinking {target!r}: {example.choices}") 

1566 

1567 if not self.settings.report_multiple_bugs: 

1568 # If multi-bug reporting is disabled, we shrink our currently-minimal 

1569 # failure, allowing 'slips' to any bug with a smaller minimal example. 

1570 self.shrink(example, lambda d: d.status == Status.INTERESTING) 

1571 return 

1572 

1573 def predicate(d: Union[ConjectureResult, _Overrun]) -> bool: 

1574 if d.status < Status.INTERESTING: 

1575 return False 

1576 d = cast(ConjectureResult, d) 

1577 return d.interesting_origin == target 

1578 

1579 self.shrink(example, predicate) 

1580 

1581 self.shrunk_examples.add(target) 

1582 

1583 def clear_secondary_key(self) -> None: 

1584 if self.has_existing_examples(): 

1585 # If we have any smaller examples in the secondary corpus, now is 

1586 # a good time to try them to see if they work as shrinks. They 

1587 # probably won't, but it's worth a shot and gives us a good 

1588 # opportunity to clear out the database. 

1589 

1590 # It's not worth trying the primary corpus because we already 

1591 # tried all of those in the initial phase. 

1592 corpus = sorted( 

1593 self.settings.database.fetch(self.secondary_key), key=shortlex 

1594 ) 

1595 for c in corpus: 

1596 choices = choices_from_bytes(c) 

1597 if choices is None: 

1598 self.settings.database.delete(self.secondary_key, c) 

1599 continue 

1600 primary = { 

1601 choices_to_bytes(v.choices) 

1602 for v in self.interesting_examples.values() 

1603 } 

1604 if shortlex(c) > max(map(shortlex, primary)): 

1605 break 

1606 

1607 self.cached_test_function(choices) 

1608 # We unconditionally remove c from the secondary key as it 

1609 # is either now primary or worse than our primary example 

1610 # of this reason for interestingness. 

1611 self.settings.database.delete(self.secondary_key, c) 

1612 

1613 def shrink( 

1614 self, 

1615 example: Union[ConjectureData, ConjectureResult], 

1616 predicate: Optional[ShrinkPredicateT] = None, 

1617 allow_transition: Optional[ 

1618 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1619 ] = None, 

1620 ) -> Union[ConjectureData, ConjectureResult]: 

1621 s = self.new_shrinker(example, predicate, allow_transition) 

1622 s.shrink() 

1623 return s.shrink_target 

1624 

1625 def new_shrinker( 

1626 self, 

1627 example: Union[ConjectureData, ConjectureResult], 

1628 predicate: Optional[ShrinkPredicateT] = None, 

1629 allow_transition: Optional[ 

1630 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1631 ] = None, 

1632 ) -> Shrinker: 

1633 return Shrinker( 

1634 self, 

1635 example, 

1636 predicate, 

1637 allow_transition=allow_transition, 

1638 explain=Phase.explain in self.settings.phases, 

1639 in_target_phase=self._current_phase == "target", 

1640 ) 

1641 

1642 def passing_choice_sequences( 

1643 self, prefix: Sequence[ChoiceNode] = () 

1644 ) -> frozenset[tuple[ChoiceNode, ...]]: 

1645 """Return a collection of choice sequence nodes which cause the test to pass. 

1646 Optionally restrict this by a certain prefix, which is useful for explain mode. 

1647 """ 

1648 return frozenset( 

1649 cast(ConjectureResult, result).nodes 

1650 for key in self.__data_cache 

1651 if (result := self.__data_cache[key]).status is Status.VALID 

1652 and startswith(cast(ConjectureResult, result).nodes, prefix) 

1653 ) 

1654 

1655 

1656class ContainsDiscard(Exception): 

1657 pass