Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/engine.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

704 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import importlib 

12import inspect 

13import math 

14import threading 

15import time 

16from collections import defaultdict 

17from collections.abc import Generator, Sequence 

18from contextlib import AbstractContextManager, contextmanager, nullcontext, suppress 

19from dataclasses import dataclass, field 

20from datetime import timedelta 

21from enum import Enum 

22from random import Random, getrandbits 

23from typing import Callable, Literal, NoReturn, Optional, Union, cast 

24 

25from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings 

26from hypothesis._settings import local_settings, note_deprecation 

27from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes 

28from hypothesis.errors import ( 

29 BackendCannotProceed, 

30 FlakyBackendFailure, 

31 HypothesisException, 

32 InvalidArgument, 

33 StopTest, 

34) 

35from hypothesis.internal.cache import LRUReusedCache 

36from hypothesis.internal.compat import NotRequired, TypedDict, ceil, override 

37from hypothesis.internal.conjecture.choice import ( 

38 ChoiceConstraintsT, 

39 ChoiceKeyT, 

40 ChoiceNode, 

41 ChoiceT, 

42 ChoiceTemplate, 

43 choices_key, 

44) 

45from hypothesis.internal.conjecture.data import ( 

46 ConjectureData, 

47 ConjectureResult, 

48 DataObserver, 

49 Overrun, 

50 Status, 

51 _Overrun, 

52) 

53from hypothesis.internal.conjecture.datatree import ( 

54 DataTree, 

55 PreviouslyUnseenBehaviour, 

56 TreeRecordingObserver, 

57) 

58from hypothesis.internal.conjecture.junkdrawer import ( 

59 ensure_free_stackframes, 

60 startswith, 

61) 

62from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser 

63from hypothesis.internal.conjecture.providers import ( 

64 AVAILABLE_PROVIDERS, 

65 HypothesisProvider, 

66 PrimitiveProvider, 

67) 

68from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key 

69from hypothesis.internal.escalation import InterestingOrigin 

70from hypothesis.internal.healthcheck import fail_health_check 

71from hypothesis.internal.observability import Observation, with_observability_callback 

72from hypothesis.reporting import base_report, report 

73 

74# In most cases, the following constants are all Final. However, we do allow users 

75# to monkeypatch all of these variables, which means we cannot annotate them as 

76# Final or mypyc will inline them and render monkeypatching useless. 

77 

78#: The maximum number of times the shrinker will reduce the complexity of a failing 

79#: input before giving up. This avoids falling down a trap of exponential (or worse) 

80#: complexity, where the shrinker appears to be making progress but will take a 

81#: substantially long time to finish completely. 

82MAX_SHRINKS: int = 500 

83 

84# If the shrinking phase takes more than five minutes, abort it early and print 

85# a warning. Many CI systems will kill a build after around ten minutes with 

86# no output, and appearing to hang isn't great for interactive use either - 

87# showing partially-shrunk examples is better than quitting with no examples! 

88# (but make it monkeypatchable, for the rare users who need to keep on shrinking) 

89 

90#: The maximum total time in seconds that the shrinker will try to shrink a failure 

91#: for before giving up. This is across all shrinks for the same failure, so even 

92#: if the shrinker successfully reduces the complexity of a single failure several 

93#: times, it will stop when it hits |MAX_SHRINKING_SECONDS| of total time taken. 

94MAX_SHRINKING_SECONDS: int = 300 

95 

96#: The maximum amount of entropy a single test case can use before giving up 

97#: while making random choices during input generation. 

98#: 

99#: The "unit" of one |BUFFER_SIZE| does not have any defined semantics, and you 

100#: should not rely on it, except that a linear increase |BUFFER_SIZE| will linearly 

101#: increase the amount of entropy a test case can use during generation. 

102BUFFER_SIZE: int = 8 * 1024 

103CACHE_SIZE: int = 10000 

104MIN_TEST_CALLS: int = 10 

105 

106 

107def shortlex(s): 

108 return (len(s), s) 

109 

110 

111@dataclass 

112class HealthCheckState: 

113 valid_examples: int = field(default=0) 

114 invalid_examples: int = field(default=0) 

115 overrun_examples: int = field(default=0) 

116 draw_times: defaultdict[str, list[float]] = field( 

117 default_factory=lambda: defaultdict(list) 

118 ) 

119 

120 @property 

121 def total_draw_time(self) -> float: 

122 return math.fsum(sum(self.draw_times.values(), start=[])) 

123 

124 def timing_report(self) -> str: 

125 """Return a terminal report describing what was slow.""" 

126 if not self.draw_times: 

127 return "" 

128 width = max( 

129 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times 

130 ) 

131 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"] 

132 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1])) 

133 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch 

134 # If we have very many unique keys, which can happen due to interactive 

135 # draws with computed labels, we'll skip uninformative rows. 

136 if ( 

137 5 <= i < (len(self.draw_times) - 2) 

138 and math.fsum(times) * 20 < self.total_draw_time 

139 ): 

140 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)") 

141 break 

142 # Compute the row to report, omitting times <1ms to focus on slow draws 

143 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4] 

144 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",") 

145 arg = argname.removeprefix("generate:").removesuffix(": ") 

146 out.append( 

147 f" {arg:^{width}} | {len(times):>4} | " 

148 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}" 

149 ) 

150 return "\n".join(out) 

151 

152 

153class ExitReason(Enum): 

154 max_examples = "settings.max_examples={s.max_examples}" 

155 max_iterations = ( 

156 "settings.max_examples={s.max_examples}, " 

157 "but < 10% of examples satisfied assumptions" 

158 ) 

159 max_shrinks = f"shrunk example {MAX_SHRINKS} times" 

160 finished = "nothing left to do" 

161 flaky = "test was flaky" 

162 very_slow_shrinking = "shrinking was very slow" 

163 

164 def describe(self, settings: Settings) -> str: 

165 return self.value.format(s=settings) 

166 

167 

168class RunIsComplete(Exception): 

169 pass 

170 

171 

172def _get_provider(backend: str) -> Union[type, PrimitiveProvider]: 

173 provider_cls = AVAILABLE_PROVIDERS[backend] 

174 if isinstance(provider_cls, str): 

175 module_name, class_name = provider_cls.rsplit(".", 1) 

176 provider_cls = getattr(importlib.import_module(module_name), class_name) 

177 

178 if provider_cls.lifetime == "test_function": 

179 return provider_cls(None) 

180 elif provider_cls.lifetime == "test_case": 

181 return provider_cls 

182 else: 

183 raise InvalidArgument( 

184 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. " 

185 "Expected one of 'test_function', 'test_case'." 

186 ) 

187 

188 

189class CallStats(TypedDict): 

190 status: str 

191 runtime: float 

192 drawtime: float 

193 gctime: float 

194 events: list[str] 

195 

196 

197PhaseStatistics = TypedDict( 

198 "PhaseStatistics", 

199 { 

200 "duration-seconds": float, 

201 "test-cases": list[CallStats], 

202 "distinct-failures": int, 

203 "shrinks-successful": int, 

204 }, 

205) 

206StatisticsDict = TypedDict( 

207 "StatisticsDict", 

208 { 

209 "generate-phase": NotRequired[PhaseStatistics], 

210 "reuse-phase": NotRequired[PhaseStatistics], 

211 "shrink-phase": NotRequired[PhaseStatistics], 

212 "stopped-because": NotRequired[str], 

213 "targets": NotRequired[dict[str, float]], 

214 "nodeid": NotRequired[str], 

215 }, 

216) 

217 

218 

219def choice_count(choices: Sequence[Union[ChoiceT, ChoiceTemplate]]) -> Optional[int]: 

220 count = 0 

221 for choice in choices: 

222 if isinstance(choice, ChoiceTemplate): 

223 if choice.count is None: 

224 return None 

225 count += choice.count 

226 else: 

227 count += 1 

228 return count 

229 

230 

231class DiscardObserver(DataObserver): 

232 @override 

233 def kill_branch(self) -> NoReturn: 

234 raise ContainsDiscard 

235 

236 

237def realize_choices(data: ConjectureData, *, for_failure: bool) -> None: 

238 # backwards-compatibility with backends without for_failure, can remove 

239 # in a few months 

240 kwargs = {} 

241 if for_failure: 

242 if "for_failure" in inspect.signature(data.provider.realize).parameters: 

243 kwargs["for_failure"] = True 

244 else: 

245 note_deprecation( 

246 f"{type(data.provider).__qualname__}.realize does not have the " 

247 "for_failure parameter. This will be an error in future versions " 

248 "of Hypothesis. (If you installed this backend from a separate " 

249 "package, upgrading that package may help).", 

250 has_codemod=False, 

251 since="2025-05-07", 

252 ) 

253 

254 for node in data.nodes: 

255 value = data.provider.realize(node.value, **kwargs) 

256 expected_type = { 

257 "string": str, 

258 "float": float, 

259 "integer": int, 

260 "boolean": bool, 

261 "bytes": bytes, 

262 }[node.type] 

263 if type(value) is not expected_type: 

264 raise HypothesisException( 

265 f"expected {expected_type} from " 

266 f"{data.provider.realize.__qualname__}, got {type(value)}" 

267 ) 

268 

269 constraints = cast( 

270 ChoiceConstraintsT, 

271 { 

272 k: data.provider.realize(v, **kwargs) 

273 for k, v in node.constraints.items() 

274 }, 

275 ) 

276 node.value = value 

277 node.constraints = constraints 

278 

279 

280class ConjectureRunner: 

281 def __init__( 

282 self, 

283 test_function: Callable[[ConjectureData], None], 

284 *, 

285 settings: Optional[Settings] = None, 

286 random: Optional[Random] = None, 

287 database_key: Optional[bytes] = None, 

288 ignore_limits: bool = False, 

289 thread_overlap: Optional[dict[int, bool]] = None, 

290 ) -> None: 

291 self._test_function: Callable[[ConjectureData], None] = test_function 

292 self.settings: Settings = settings or Settings() 

293 self.shrinks: int = 0 

294 self.finish_shrinking_deadline: Optional[float] = None 

295 self.call_count: int = 0 

296 self.misaligned_count: int = 0 

297 self.valid_examples: int = 0 

298 self.invalid_examples: int = 0 

299 self.overrun_examples: int = 0 

300 self.random: Random = random or Random(getrandbits(128)) 

301 self.database_key: Optional[bytes] = database_key 

302 self.ignore_limits: bool = ignore_limits 

303 self.thread_overlap = {} if thread_overlap is None else thread_overlap 

304 

305 # Global dict of per-phase statistics, and a list of per-call stats 

306 # which transfer to the global dict at the end of each phase. 

307 self._current_phase: str = "(not a phase)" 

308 self.statistics: StatisticsDict = {} 

309 self.stats_per_test_case: list[CallStats] = [] 

310 

311 self.interesting_examples: dict[InterestingOrigin, ConjectureResult] = {} 

312 # We use call_count because there may be few possible valid_examples. 

313 self.first_bug_found_at: Optional[int] = None 

314 self.last_bug_found_at: Optional[int] = None 

315 

316 self.shrunk_examples: set[InterestingOrigin] = set() 

317 self.health_check_state: Optional[HealthCheckState] = None 

318 self.tree: DataTree = DataTree() 

319 self.provider: Union[type, PrimitiveProvider] = _get_provider( 

320 self.settings.backend 

321 ) 

322 

323 self.best_observed_targets: defaultdict[str, float] = defaultdict( 

324 lambda: NO_SCORE 

325 ) 

326 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {} 

327 

328 # We keep the pareto front in the example database if we have one. This 

329 # is only marginally useful at present, but speeds up local development 

330 # because it means that large targets will be quickly surfaced in your 

331 # testing. 

332 self.pareto_front: Optional[ParetoFront] = None 

333 if self.database_key is not None and self.settings.database is not None: 

334 self.pareto_front = ParetoFront(self.random) 

335 self.pareto_front.on_evict(self.on_pareto_evict) 

336 

337 # We want to be able to get the ConjectureData object that results 

338 # from running a choice sequence without recalculating, especially during 

339 # shrinking where we need to know about the structure of the 

340 # executed test case. 

341 self.__data_cache = LRUReusedCache[ 

342 tuple[ChoiceKeyT, ...], Union[ConjectureResult, _Overrun] 

343 ](CACHE_SIZE) 

344 

345 self.reused_previously_shrunk_test_case: bool = False 

346 

347 self.__pending_call_explanation: Optional[str] = None 

348 self._backend_found_failure: bool = False 

349 self._backend_exceeded_deadline: bool = False 

350 self._switch_to_hypothesis_provider: bool = False 

351 

352 self.__failed_realize_count: int = 0 

353 # note unsound verification by alt backends 

354 self._verified_by: Optional[str] = None 

355 

356 @contextmanager 

357 def _with_switch_to_hypothesis_provider( 

358 self, value: bool 

359 ) -> Generator[None, None, None]: 

360 previous = self._switch_to_hypothesis_provider 

361 try: 

362 self._switch_to_hypothesis_provider = value 

363 yield 

364 finally: 

365 self._switch_to_hypothesis_provider = previous 

366 

367 @property 

368 def using_hypothesis_backend(self) -> bool: 

369 return ( 

370 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider 

371 ) 

372 

373 def explain_next_call_as(self, explanation: str) -> None: 

374 self.__pending_call_explanation = explanation 

375 

376 def clear_call_explanation(self) -> None: 

377 self.__pending_call_explanation = None 

378 

379 @contextmanager 

380 def _log_phase_statistics( 

381 self, phase: Literal["reuse", "generate", "shrink"] 

382 ) -> Generator[None, None, None]: 

383 self.stats_per_test_case.clear() 

384 start_time = time.perf_counter() 

385 try: 

386 self._current_phase = phase 

387 yield 

388 finally: 

389 self.statistics[phase + "-phase"] = { # type: ignore 

390 "duration-seconds": time.perf_counter() - start_time, 

391 "test-cases": list(self.stats_per_test_case), 

392 "distinct-failures": len(self.interesting_examples), 

393 "shrinks-successful": self.shrinks, 

394 } 

395 

396 @property 

397 def should_optimise(self) -> bool: 

398 return Phase.target in self.settings.phases 

399 

400 def __tree_is_exhausted(self) -> bool: 

401 return self.tree.is_exhausted and self.using_hypothesis_backend 

402 

403 def __stoppable_test_function(self, data: ConjectureData) -> None: 

404 """Run ``self._test_function``, but convert a ``StopTest`` exception 

405 into a normal return and avoid raising anything flaky for RecursionErrors. 

406 """ 

407 # We ensure that the test has this much stack space remaining, no 

408 # matter the size of the stack when called, to de-flake RecursionErrors 

409 # (#2494, #3671). Note, this covers the data generation part of the test; 

410 # the actual test execution is additionally protected at the call site 

411 # in hypothesis.core.execute_once. 

412 with ensure_free_stackframes(): 

413 try: 

414 self._test_function(data) 

415 except StopTest as e: 

416 if e.testcounter == data.testcounter: 

417 # This StopTest has successfully stopped its test, and can now 

418 # be discarded. 

419 pass 

420 else: 

421 # This StopTest was raised by a different ConjectureData. We 

422 # need to re-raise it so that it will eventually reach the 

423 # correct engine. 

424 raise 

425 

426 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]: 

427 return choices_key(choices) 

428 

429 def _cache(self, data: ConjectureData) -> None: 

430 result = data.as_result() 

431 key = self._cache_key(data.choices) 

432 self.__data_cache[key] = result 

433 

434 def cached_test_function( 

435 self, 

436 choices: Sequence[Union[ChoiceT, ChoiceTemplate]], 

437 *, 

438 error_on_discard: bool = False, 

439 extend: Union[int, Literal["full"]] = 0, 

440 ) -> Union[ConjectureResult, _Overrun]: 

441 """ 

442 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard`` 

443 in preference to running the actual test function. This is to allow us 

444 to skip test cases we expect to be redundant in some cases. Note that 

445 it may be the case that we don't raise ``ContainsDiscard`` even if the 

446 result has discards if we cannot determine from previous runs whether 

447 it will have a discard. 

448 """ 

449 # node templates represent a not-yet-filled hole and therefore cannot 

450 # be cached or retrieved from the cache. 

451 if not any(isinstance(choice, ChoiceTemplate) for choice in choices): 

452 # this type cast is validated by the isinstance check above (ie, there 

453 # are no ChoiceTemplate elements). 

454 choices = cast(Sequence[ChoiceT], choices) 

455 key = self._cache_key(choices) 

456 try: 

457 cached = self.__data_cache[key] 

458 # if we have a cached overrun for this key, but we're allowing extensions 

459 # of the nodes, it could in fact run to a valid data if we try. 

460 if extend == 0 or cached.status is not Status.OVERRUN: 

461 return cached 

462 except KeyError: 

463 pass 

464 

465 if extend == "full": 

466 max_length = None 

467 elif (count := choice_count(choices)) is None: 

468 max_length = None 

469 else: 

470 max_length = count + extend 

471 

472 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver. 

473 # The reason is we don't expect simulate_test_function to explore new choices 

474 # and write back to the tree, so we don't want the overhead of the 

475 # TreeRecordingObserver tracking those calls. 

476 trial_observer: Optional[DataObserver] = DataObserver() 

477 if error_on_discard: 

478 trial_observer = DiscardObserver() 

479 

480 try: 

481 trial_data = self.new_conjecture_data( 

482 choices, observer=trial_observer, max_choices=max_length 

483 ) 

484 self.tree.simulate_test_function(trial_data) 

485 except PreviouslyUnseenBehaviour: 

486 pass 

487 else: 

488 trial_data.freeze() 

489 key = self._cache_key(trial_data.choices) 

490 if trial_data.status > Status.OVERRUN: 

491 try: 

492 return self.__data_cache[key] 

493 except KeyError: 

494 pass 

495 else: 

496 # if we simulated to an overrun, then we our result is certainly 

497 # an overrun; no need to consult the cache. (and we store this result 

498 # for simulation-less lookup later). 

499 self.__data_cache[key] = Overrun 

500 return Overrun 

501 try: 

502 return self.__data_cache[key] 

503 except KeyError: 

504 pass 

505 

506 data = self.new_conjecture_data(choices, max_choices=max_length) 

507 # note that calling test_function caches `data` for us. 

508 self.test_function(data) 

509 return data.as_result() 

510 

511 def test_function(self, data: ConjectureData) -> None: 

512 if self.__pending_call_explanation is not None: 

513 self.debug(self.__pending_call_explanation) 

514 self.__pending_call_explanation = None 

515 

516 self.call_count += 1 

517 interrupted = False 

518 

519 try: 

520 self.__stoppable_test_function(data) 

521 except KeyboardInterrupt: 

522 interrupted = True 

523 raise 

524 except BackendCannotProceed as exc: 

525 if exc.scope in ("verified", "exhausted"): 

526 self._switch_to_hypothesis_provider = True 

527 if exc.scope == "verified": 

528 self._verified_by = self.settings.backend 

529 elif exc.scope == "discard_test_case": 

530 self.__failed_realize_count += 1 

531 if ( 

532 self.__failed_realize_count > 10 

533 and (self.__failed_realize_count / self.call_count) > 0.2 

534 ): 

535 self._switch_to_hypothesis_provider = True 

536 

537 # treat all BackendCannotProceed exceptions as invalid. This isn't 

538 # great; "verified" should really be counted as self.valid_examples += 1. 

539 # But we check self.valid_examples == 0 to determine whether to raise 

540 # Unsatisfiable, and that would throw this check off. 

541 self.invalid_examples += 1 

542 

543 # skip the post-test-case tracking; we're pretending this never happened 

544 interrupted = True 

545 data.cannot_proceed_scope = exc.scope 

546 data.freeze() 

547 return 

548 except BaseException: 

549 data.freeze() 

550 if self.settings.backend != "hypothesis": 

551 realize_choices(data, for_failure=True) 

552 self.save_choices(data.choices) 

553 raise 

554 finally: 

555 # No branch, because if we're interrupted we always raise 

556 # the KeyboardInterrupt, never continue to the code below. 

557 if not interrupted: # pragma: no branch 

558 assert data.cannot_proceed_scope is None 

559 data.freeze() 

560 call_stats: CallStats = { 

561 "status": data.status.name.lower(), 

562 "runtime": data.finish_time - data.start_time, 

563 "drawtime": math.fsum(data.draw_times.values()), 

564 "gctime": data.gc_finish_time - data.gc_start_time, 

565 "events": sorted( 

566 k if v == "" else f"{k}: {v}" for k, v in data.events.items() 

567 ), 

568 } 

569 self.stats_per_test_case.append(call_stats) 

570 if self.settings.backend != "hypothesis": 

571 realize_choices(data, for_failure=data.status is Status.INTERESTING) 

572 

573 self._cache(data) 

574 if data.misaligned_at is not None: # pragma: no branch # coverage bug? 

575 self.misaligned_count += 1 

576 

577 self.debug_data(data) 

578 

579 if ( 

580 data.target_observations 

581 and self.pareto_front is not None 

582 and self.pareto_front.add(data.as_result()) 

583 ): 

584 self.save_choices(data.choices, sub_key=b"pareto") 

585 

586 if data.status >= Status.VALID: 

587 for k, v in data.target_observations.items(): 

588 self.best_observed_targets[k] = max(self.best_observed_targets[k], v) 

589 

590 if k not in self.best_examples_of_observed_targets: 

591 data_as_result = data.as_result() 

592 assert not isinstance(data_as_result, _Overrun) 

593 self.best_examples_of_observed_targets[k] = data_as_result 

594 continue 

595 

596 existing_example = self.best_examples_of_observed_targets[k] 

597 existing_score = existing_example.target_observations[k] 

598 

599 if v < existing_score: 

600 continue 

601 

602 if v > existing_score or sort_key(data.nodes) < sort_key( 

603 existing_example.nodes 

604 ): 

605 data_as_result = data.as_result() 

606 assert not isinstance(data_as_result, _Overrun) 

607 self.best_examples_of_observed_targets[k] = data_as_result 

608 

609 if data.status is Status.VALID: 

610 self.valid_examples += 1 

611 if data.status is Status.INVALID: 

612 self.invalid_examples += 1 

613 if data.status is Status.OVERRUN: 

614 self.overrun_examples += 1 

615 

616 if data.status == Status.INTERESTING: 

617 if not self.using_hypothesis_backend: 

618 # replay this failure on the hypothesis backend to ensure it still 

619 # finds a failure. otherwise, it is flaky. 

620 initial_exception = data.expected_exception 

621 data = ConjectureData.for_choices(data.choices) 

622 # we've already going to use the hypothesis provider for this 

623 # data, so the verb "switch" is a bit misleading here. We're really 

624 # setting this to inform our on_observation logic that the observation 

625 # generated here was from a hypothesis backend, and shouldn't be 

626 # sent to the on_observation of any alternative backend. 

627 with self._with_switch_to_hypothesis_provider(True): 

628 self.__stoppable_test_function(data) 

629 data.freeze() 

630 # TODO: Should same-origin also be checked? (discussion in 

631 # https://github.com/HypothesisWorks/hypothesis/pull/4470#discussion_r2217055487) 

632 if data.status != Status.INTERESTING: 

633 desc_new_status = { 

634 data.status.VALID: "passed", 

635 data.status.INVALID: "failed filters", 

636 data.status.OVERRUN: "overran", 

637 }[data.status] 

638 raise FlakyBackendFailure( 

639 f"Inconsistent results from replaying a failing test case! " 

640 f"Raised {type(initial_exception).__name__} on " 

641 f"backend={self.settings.backend!r}, but " 

642 f"{desc_new_status} under backend='hypothesis'.", 

643 [initial_exception], 

644 ) 

645 

646 self._cache(data) 

647 

648 assert data.interesting_origin is not None 

649 key = data.interesting_origin 

650 changed = False 

651 try: 

652 existing = self.interesting_examples[key] 

653 except KeyError: 

654 changed = True 

655 self.last_bug_found_at = self.call_count 

656 if self.first_bug_found_at is None: 

657 self.first_bug_found_at = self.call_count 

658 else: 

659 if sort_key(data.nodes) < sort_key(existing.nodes): 

660 self.shrinks += 1 

661 self.downgrade_choices(existing.choices) 

662 self.__data_cache.unpin(self._cache_key(existing.choices)) 

663 changed = True 

664 

665 if changed: 

666 self.save_choices(data.choices) 

667 self.interesting_examples[key] = data.as_result() # type: ignore 

668 if not self.using_hypothesis_backend: 

669 self._backend_found_failure = True 

670 self.__data_cache.pin(self._cache_key(data.choices), data.as_result()) 

671 self.shrunk_examples.discard(key) 

672 

673 if self.shrinks >= MAX_SHRINKS: 

674 self.exit_with(ExitReason.max_shrinks) 

675 

676 if ( 

677 not self.ignore_limits 

678 and self.finish_shrinking_deadline is not None 

679 and self.finish_shrinking_deadline < time.perf_counter() 

680 ): 

681 # See https://github.com/HypothesisWorks/hypothesis/issues/2340 

682 report( 

683 "WARNING: Hypothesis has spent more than five minutes working to shrink" 

684 " a failing example, and stopped because it is making very slow" 

685 " progress. When you re-run your tests, shrinking will resume and may" 

686 " take this long before aborting again.\nPLEASE REPORT THIS if you can" 

687 " provide a reproducing example, so that we can improve shrinking" 

688 " performance for everyone." 

689 ) 

690 self.exit_with(ExitReason.very_slow_shrinking) 

691 

692 if not self.interesting_examples: 

693 # Note that this logic is reproduced to end the generation phase when 

694 # we have interesting examples. Update that too if you change this! 

695 # (The doubled implementation is because here we exit the engine entirely, 

696 # while in the other case below we just want to move on to shrinking.) 

697 if self.valid_examples >= self.settings.max_examples: 

698 self.exit_with(ExitReason.max_examples) 

699 if self.call_count >= max( 

700 self.settings.max_examples * 10, 

701 # We have a high-ish default max iterations, so that tests 

702 # don't become flaky when max_examples is too low. 

703 1000, 

704 ): 

705 self.exit_with(ExitReason.max_iterations) 

706 

707 if self.__tree_is_exhausted(): 

708 self.exit_with(ExitReason.finished) 

709 

710 self.record_for_health_check(data) 

711 

712 def on_pareto_evict(self, data: ConjectureResult) -> None: 

713 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices)) 

714 

715 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]: 

716 """Uses the tree to proactively generate a starting choice sequence 

717 that we haven't explored yet for this test. 

718 

719 When this method is called, we assume that there must be at 

720 least one novel prefix left to find. If there were not, then the 

721 test run should have already stopped due to tree exhaustion. 

722 """ 

723 return self.tree.generate_novel_prefix(self.random) 

724 

725 def record_for_health_check(self, data: ConjectureData) -> None: 

726 # Once we've actually found a bug, there's no point in trying to run 

727 # health checks - they'll just mask the actually important information. 

728 if data.status == Status.INTERESTING: 

729 self.health_check_state = None 

730 

731 state = self.health_check_state 

732 

733 if state is None: 

734 return 

735 

736 for k, v in data.draw_times.items(): 

737 state.draw_times[k].append(v) 

738 

739 if data.status == Status.VALID: 

740 state.valid_examples += 1 

741 elif data.status == Status.INVALID: 

742 state.invalid_examples += 1 

743 else: 

744 assert data.status == Status.OVERRUN 

745 state.overrun_examples += 1 

746 

747 max_valid_draws = 10 

748 max_invalid_draws = 50 

749 max_overrun_draws = 20 

750 

751 assert state.valid_examples <= max_valid_draws 

752 

753 if state.valid_examples == max_valid_draws: 

754 self.health_check_state = None 

755 return 

756 

757 if state.overrun_examples == max_overrun_draws: 

758 fail_health_check( 

759 self.settings, 

760 "Generated inputs routinely consumed more than the maximum " 

761 f"allowed entropy: {state.valid_examples} inputs were generated " 

762 f"successfully, while {state.overrun_examples} inputs exceeded the " 

763 f"maximum allowed entropy during generation." 

764 "\n\n" 

765 f"Testing with inputs this large tends to be slow, and to produce " 

766 "failures that are both difficult to shrink and difficult to understand. " 

767 "Try decreasing the amount of data generated, for example by " 

768 "decreasing the minimum size of collection strategies like " 

769 "st.lists()." 

770 "\n\n" 

771 "If you expect the average size of your input to be this large, " 

772 "you can disable this health check with " 

773 "@settings(suppress_health_check=[HealthCheck.data_too_large]). " 

774 "See " 

775 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

776 "for details.", 

777 HealthCheck.data_too_large, 

778 ) 

779 if state.invalid_examples == max_invalid_draws: 

780 fail_health_check( 

781 self.settings, 

782 "It looks like this test is filtering out a lot of inputs. " 

783 f"{state.valid_examples} inputs were generated successfully, " 

784 f"while {state.invalid_examples} inputs were filtered out. " 

785 "\n\n" 

786 "An input might be filtered out by calls to assume(), " 

787 "strategy.filter(...), or occasionally by Hypothesis internals." 

788 "\n\n" 

789 "Applying this much filtering makes input generation slow, since " 

790 "Hypothesis must discard inputs which are filtered out and try " 

791 "generating it again. It is also possible that applying this much " 

792 "filtering will distort the domain and/or distribution of the test, " 

793 "leaving your testing less rigorous than expected." 

794 "\n\n" 

795 "If you expect this many inputs to be filtered out during generation, " 

796 "you can disable this health check with " 

797 "@settings(suppress_health_check=[HealthCheck.filter_too_much]). See " 

798 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

799 "for details.", 

800 HealthCheck.filter_too_much, 

801 ) 

802 

803 # Allow at least the greater of one second or 5x the deadline. If deadline 

804 # is None, allow 30s - the user can disable the healthcheck too if desired. 

805 draw_time = state.total_draw_time 

806 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6)) 

807 if ( 

808 draw_time > max(1.0, draw_time_limit.total_seconds()) 

809 # we disable HealthCheck.too_slow under concurrent threads, since 

810 # cpython may switch away from a thread for arbitrarily long. 

811 and not self.thread_overlap.get(threading.get_ident(), False) 

812 ): 

813 extra_str = [] 

814 if state.invalid_examples: 

815 extra_str.append(f"{state.invalid_examples} invalid inputs") 

816 if state.overrun_examples: 

817 extra_str.append( 

818 f"{state.overrun_examples} inputs which exceeded the " 

819 "maximum allowed entropy" 

820 ) 

821 extra_str = ", and ".join(extra_str) 

822 extra_str = f" ({extra_str})" if extra_str else "" 

823 

824 fail_health_check( 

825 self.settings, 

826 "Input generation is slow: Hypothesis only generated " 

827 f"{state.valid_examples} valid inputs after {draw_time:.2f} " 

828 f"seconds{extra_str}." 

829 "\n" + state.timing_report() + "\n\n" 

830 "This could be for a few reasons:" 

831 "\n" 

832 "1. This strategy could be generating too much data per input. " 

833 "Try decreasing the amount of data generated, for example by " 

834 "decreasing the minimum size of collection strategies like " 

835 "st.lists()." 

836 "\n" 

837 "2. Some other expensive computation could be running during input " 

838 "generation. For example, " 

839 "if @st.composite or st.data() is interspersed with an expensive " 

840 "computation, HealthCheck.too_slow is likely to trigger. If this " 

841 "computation is unrelated to input generation, move it elsewhere. " 

842 "Otherwise, try making it more efficient, or disable this health " 

843 "check if that is not possible." 

844 "\n\n" 

845 "If you expect input generation to take this long, you can disable " 

846 "this health check with " 

847 "@settings(suppress_health_check=[HealthCheck.too_slow]). See " 

848 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

849 "for details.", 

850 HealthCheck.too_slow, 

851 ) 

852 

853 def save_choices( 

854 self, choices: Sequence[ChoiceT], sub_key: Optional[bytes] = None 

855 ) -> None: 

856 if self.settings.database is not None: 

857 key = self.sub_key(sub_key) 

858 if key is None: 

859 return 

860 self.settings.database.save(key, choices_to_bytes(choices)) 

861 

862 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None: 

863 buffer = choices_to_bytes(choices) 

864 if self.settings.database is not None and self.database_key is not None: 

865 self.settings.database.move(self.database_key, self.secondary_key, buffer) 

866 

867 def sub_key(self, sub_key: Optional[bytes]) -> Optional[bytes]: 

868 if self.database_key is None: 

869 return None 

870 if sub_key is None: 

871 return self.database_key 

872 return b".".join((self.database_key, sub_key)) 

873 

874 @property 

875 def secondary_key(self) -> Optional[bytes]: 

876 return self.sub_key(b"secondary") 

877 

878 @property 

879 def pareto_key(self) -> Optional[bytes]: 

880 return self.sub_key(b"pareto") 

881 

882 def debug(self, message: str) -> None: 

883 if self.settings.verbosity >= Verbosity.debug: 

884 base_report(message) 

885 

886 @property 

887 def report_debug_info(self) -> bool: 

888 return self.settings.verbosity >= Verbosity.debug 

889 

890 def debug_data(self, data: Union[ConjectureData, ConjectureResult]) -> None: 

891 if not self.report_debug_info: 

892 return 

893 

894 status = repr(data.status) 

895 if data.status == Status.INTERESTING: 

896 status = f"{status} ({data.interesting_origin!r})" 

897 

898 self.debug( 

899 f"{len(data.choices)} choices {data.choices} -> {status}" 

900 f"{', ' + data.output if data.output else ''}" 

901 ) 

902 

903 def observe_for_provider(self) -> AbstractContextManager: 

904 def on_observation(observation: Observation) -> None: 

905 assert observation.type == "test_case" 

906 # because lifetime == "test_function" 

907 assert isinstance(self.provider, PrimitiveProvider) 

908 # only fire if we actually used that provider to generate this observation 

909 if not self._switch_to_hypothesis_provider: 

910 self.provider.on_observation(observation) 

911 

912 if ( 

913 self.settings.backend != "hypothesis" 

914 # only for lifetime = "test_function" providers (guaranteed 

915 # by this isinstance check) 

916 and isinstance(self.provider, PrimitiveProvider) 

917 # and the provider opted-in to observations 

918 and self.provider.add_observability_callback 

919 ): 

920 return with_observability_callback(on_observation) 

921 return nullcontext() 

922 

923 def run(self) -> None: 

924 with local_settings(self.settings): 

925 # NOTE: For compatibility with Python 3.9's LL(1) 

926 # parser, this is written as a nested with-statement, 

927 # instead of a compound one. 

928 with self.observe_for_provider(): 

929 try: 

930 self._run() 

931 except RunIsComplete: 

932 pass 

933 for v in self.interesting_examples.values(): 

934 self.debug_data(v) 

935 self.debug( 

936 "Run complete after %d examples (%d valid) and %d shrinks" 

937 % (self.call_count, self.valid_examples, self.shrinks) 

938 ) 

939 

940 @property 

941 def database(self) -> Optional[ExampleDatabase]: 

942 if self.database_key is None: 

943 return None 

944 return self.settings.database 

945 

946 def has_existing_examples(self) -> bool: 

947 return self.database is not None and Phase.reuse in self.settings.phases 

948 

949 def reuse_existing_examples(self) -> None: 

950 """If appropriate (we have a database and have been told to use it), 

951 try to reload existing examples from the database. 

952 

953 If there are a lot we don't try all of them. We always try the 

954 smallest example in the database (which is guaranteed to be the 

955 last failure) and the largest (which is usually the seed example 

956 which the last failure came from but we don't enforce that). We 

957 then take a random sampling of the remainder and try those. Any 

958 examples that are no longer interesting are cleared out. 

959 """ 

960 if self.has_existing_examples(): 

961 self.debug("Reusing examples from database") 

962 # We have to do some careful juggling here. We have two database 

963 # corpora: The primary and secondary. The primary corpus is a 

964 # small set of minimized examples each of which has at one point 

965 # demonstrated a distinct bug. We want to retry all of these. 

966 

967 # We also have a secondary corpus of examples that have at some 

968 # point demonstrated interestingness (currently only ones that 

969 # were previously non-minimal examples of a bug, but this will 

970 # likely expand in future). These are a good source of potentially 

971 # interesting examples, but there are a lot of them, so we down 

972 # sample the secondary corpus to a more manageable size. 

973 

974 corpus = sorted( 

975 self.settings.database.fetch(self.database_key), key=shortlex 

976 ) 

977 factor = 0.1 if (Phase.generate in self.settings.phases) else 1 

978 desired_size = max(2, ceil(factor * self.settings.max_examples)) 

979 primary_corpus_size = len(corpus) 

980 

981 if len(corpus) < desired_size: 

982 extra_corpus = list(self.settings.database.fetch(self.secondary_key)) 

983 

984 shortfall = desired_size - len(corpus) 

985 

986 if len(extra_corpus) <= shortfall: 

987 extra = extra_corpus 

988 else: 

989 extra = self.random.sample(extra_corpus, shortfall) 

990 extra.sort(key=shortlex) 

991 corpus.extend(extra) 

992 

993 # We want a fast path where every primary entry in the database was 

994 # interesting. 

995 found_interesting_in_primary = False 

996 all_interesting_in_primary_were_exact = True 

997 

998 for i, existing in enumerate(corpus): 

999 if i >= primary_corpus_size and found_interesting_in_primary: 

1000 break 

1001 choices = choices_from_bytes(existing) 

1002 if choices is None: 

1003 # clear out any keys which fail deserialization 

1004 self.settings.database.delete(self.database_key, existing) 

1005 continue 

1006 data = self.cached_test_function(choices, extend="full") 

1007 if data.status != Status.INTERESTING: 

1008 self.settings.database.delete(self.database_key, existing) 

1009 self.settings.database.delete(self.secondary_key, existing) 

1010 else: 

1011 if i < primary_corpus_size: 

1012 found_interesting_in_primary = True 

1013 assert not isinstance(data, _Overrun) 

1014 if choices_key(choices) != choices_key(data.choices): 

1015 all_interesting_in_primary_were_exact = False 

1016 if not self.settings.report_multiple_bugs: 

1017 break 

1018 if found_interesting_in_primary: 

1019 if all_interesting_in_primary_were_exact: 

1020 self.reused_previously_shrunk_test_case = True 

1021 

1022 # Because self.database is not None (because self.has_existing_examples()) 

1023 # and self.database_key is not None (because we fetched using it above), 

1024 # we can guarantee self.pareto_front is not None 

1025 assert self.pareto_front is not None 

1026 

1027 # If we've not found any interesting examples so far we try some of 

1028 # the pareto front from the last run. 

1029 if len(corpus) < desired_size and not self.interesting_examples: 

1030 desired_extra = desired_size - len(corpus) 

1031 pareto_corpus = list(self.settings.database.fetch(self.pareto_key)) 

1032 if len(pareto_corpus) > desired_extra: 

1033 pareto_corpus = self.random.sample(pareto_corpus, desired_extra) 

1034 pareto_corpus.sort(key=shortlex) 

1035 

1036 for existing in pareto_corpus: 

1037 choices = choices_from_bytes(existing) 

1038 if choices is None: 

1039 self.settings.database.delete(self.pareto_key, existing) 

1040 continue 

1041 data = self.cached_test_function(choices, extend="full") 

1042 if data not in self.pareto_front: 

1043 self.settings.database.delete(self.pareto_key, existing) 

1044 if data.status == Status.INTERESTING: 

1045 break 

1046 

1047 def exit_with(self, reason: ExitReason) -> None: 

1048 if self.ignore_limits: 

1049 return 

1050 self.statistics["stopped-because"] = reason.describe(self.settings) 

1051 if self.best_observed_targets: 

1052 self.statistics["targets"] = dict(self.best_observed_targets) 

1053 self.debug(f"exit_with({reason.name})") 

1054 self.exit_reason = reason 

1055 raise RunIsComplete 

1056 

1057 def should_generate_more(self) -> bool: 

1058 # End the generation phase where we would have ended it if no bugs had 

1059 # been found. This reproduces the exit logic in `self.test_function`, 

1060 # but with the important distinction that this clause will move on to 

1061 # the shrinking phase having found one or more bugs, while the other 

1062 # will exit having found zero bugs. 

1063 if self.valid_examples >= self.settings.max_examples or self.call_count >= max( 

1064 self.settings.max_examples * 10, 1000 

1065 ): # pragma: no cover 

1066 return False 

1067 

1068 # If we haven't found a bug, keep looking - if we hit any limits on 

1069 # the number of tests to run that will raise an exception and stop 

1070 # the run. 

1071 if not self.interesting_examples: 

1072 return True 

1073 # Users who disable shrinking probably want to exit as fast as possible. 

1074 # If we've found a bug and won't report more than one, stop looking. 

1075 elif ( 

1076 Phase.shrink not in self.settings.phases 

1077 or not self.settings.report_multiple_bugs 

1078 ): 

1079 return False 

1080 assert isinstance(self.first_bug_found_at, int) 

1081 assert isinstance(self.last_bug_found_at, int) 

1082 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count 

1083 # Otherwise, keep searching for between ten and 'a heuristic' calls. 

1084 # We cap 'calls after first bug' so errors are reported reasonably 

1085 # soon even for tests that are allowed to run for a very long time, 

1086 # or sooner if the latest half of our test effort has been fruitless. 

1087 return self.call_count < MIN_TEST_CALLS or self.call_count < min( 

1088 self.first_bug_found_at + 1000, self.last_bug_found_at * 2 

1089 ) 

1090 

1091 def generate_new_examples(self) -> None: 

1092 if Phase.generate not in self.settings.phases: 

1093 return 

1094 if self.interesting_examples: 

1095 # The example database has failing examples from a previous run, 

1096 # so we'd rather report that they're still failing ASAP than take 

1097 # the time to look for additional failures. 

1098 return 

1099 

1100 self.debug("Generating new examples") 

1101 

1102 assert self.should_generate_more() 

1103 self._switch_to_hypothesis_provider = True 

1104 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),)) 

1105 if zero_data.status > Status.OVERRUN: 

1106 assert isinstance(zero_data, ConjectureResult) 

1107 # if the crosshair backend cannot proceed, it does not (and cannot) 

1108 # realize the symbolic values, with the intent that Hypothesis will 

1109 # throw away this test case. We usually do, but if it's the zero data 

1110 # then we try to pin it here, which requires realizing the symbolics. 

1111 # 

1112 # We don't (yet) rely on the zero data being pinned, and so 

1113 # it's simply a very slight performance loss to simply not pin it 

1114 # if doing so would error. 

1115 if zero_data.cannot_proceed_scope is None: # pragma: no branch 

1116 self.__data_cache.pin( 

1117 self._cache_key(zero_data.choices), zero_data.as_result() 

1118 ) # Pin forever 

1119 

1120 if zero_data.status == Status.OVERRUN or ( 

1121 zero_data.status == Status.VALID 

1122 and isinstance(zero_data, ConjectureResult) 

1123 and zero_data.length * 2 > BUFFER_SIZE 

1124 ): 

1125 fail_health_check( 

1126 self.settings, 

1127 "The smallest natural input for this test is very " 

1128 "large. This makes it difficult for Hypothesis to generate " 

1129 "good inputs, especially when trying to shrink failing inputs." 

1130 "\n\n" 

1131 "Consider reducing the amount of data generated by the strategy. " 

1132 "Also consider introducing small alternative values for some " 

1133 "strategies. For example, could you " 

1134 "mark some arguments as optional by replacing `some_complex_strategy`" 

1135 "with `st.none() | some_complex_strategy`?" 

1136 "\n\n" 

1137 "If you are confident that the size of the smallest natural input " 

1138 "to your test cannot be reduced, you can suppress this health check " 

1139 "with @settings(suppress_health_check=[HealthCheck.large_base_example]). " 

1140 "See " 

1141 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

1142 "for details.", 

1143 HealthCheck.large_base_example, 

1144 ) 

1145 

1146 self.health_check_state = HealthCheckState() 

1147 

1148 # We attempt to use the size of the minimal generated test case starting 

1149 # from a given novel prefix as a guideline to generate smaller test 

1150 # cases for an initial period, by restriscting ourselves to test cases 

1151 # that are not much larger than it. 

1152 # 

1153 # Calculating the actual minimal generated test case is hard, so we 

1154 # take a best guess that zero extending a prefix produces the minimal 

1155 # test case starting with that prefix (this is true for our built in 

1156 # strategies). This is only a reasonable thing to do if the resulting 

1157 # test case is valid. If we regularly run into situations where it is 

1158 # not valid then this strategy is a waste of time, so we want to 

1159 # abandon it early. In order to do this we track how many times in a 

1160 # row it has failed to work, and abort small test case generation when 

1161 # it has failed too many times in a row. 

1162 consecutive_zero_extend_is_invalid = 0 

1163 

1164 # We control growth during initial example generation, for two 

1165 # reasons: 

1166 # 

1167 # * It gives us an opportunity to find small examples early, which 

1168 # gives us a fast path for easy to find bugs. 

1169 # * It avoids low probability events where we might end up 

1170 # generating very large examples during health checks, which 

1171 # on slower machines can trigger HealthCheck.too_slow. 

1172 # 

1173 # The heuristic we use is that we attempt to estimate the smallest 

1174 # extension of this prefix, and limit the size to no more than 

1175 # an order of magnitude larger than that. If we fail to estimate 

1176 # the size accurately, we skip over this prefix and try again. 

1177 # 

1178 # We need to tune the example size based on the initial prefix, 

1179 # because any fixed size might be too small, and any size based 

1180 # on the strategy in general can fall afoul of strategies that 

1181 # have very different sizes for different prefixes. 

1182 # 

1183 # We previously set a minimum value of 10 on small_example_cap, with the 

1184 # reasoning of avoiding flaky health checks. However, some users set a 

1185 # low max_examples for performance. A hard lower bound in this case biases 

1186 # the distribution towards small (and less powerful) examples. Flaky 

1187 # and loud health checks are better than silent performance degradation. 

1188 small_example_cap = min(self.settings.max_examples // 10, 50) 

1189 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10) 

1190 ran_optimisations = False 

1191 self._switch_to_hypothesis_provider = False 

1192 

1193 while self.should_generate_more(): 

1194 # we don't yet integrate DataTree with backends. Instead of generating 

1195 # a novel prefix, ask the backend for an input. 

1196 if not self.using_hypothesis_backend: 

1197 data = self.new_conjecture_data([]) 

1198 with suppress(BackendCannotProceed): 

1199 self.test_function(data) 

1200 continue 

1201 

1202 self._current_phase = "generate" 

1203 prefix = self.generate_novel_prefix() 

1204 if ( 

1205 self.valid_examples <= small_example_cap 

1206 and self.call_count <= 5 * small_example_cap 

1207 and not self.interesting_examples 

1208 and consecutive_zero_extend_is_invalid < 5 

1209 ): 

1210 minimal_example = self.cached_test_function( 

1211 prefix + (ChoiceTemplate("simplest", count=None),) 

1212 ) 

1213 

1214 if minimal_example.status < Status.VALID: 

1215 consecutive_zero_extend_is_invalid += 1 

1216 continue 

1217 # Because the Status code is greater than Status.VALID, it cannot be 

1218 # Status.OVERRUN, which guarantees that the minimal_example is a 

1219 # ConjectureResult object. 

1220 assert isinstance(minimal_example, ConjectureResult) 

1221 consecutive_zero_extend_is_invalid = 0 

1222 minimal_extension = len(minimal_example.choices) - len(prefix) 

1223 max_length = len(prefix) + minimal_extension * 5 

1224 

1225 # We could end up in a situation where even though the prefix was 

1226 # novel when we generated it, because we've now tried zero extending 

1227 # it not all possible continuations of it will be novel. In order to 

1228 # avoid making redundant test calls, we rerun it in simulation mode 

1229 # first. If this has a predictable result, then we don't bother 

1230 # running the test function for real here. If however we encounter 

1231 # some novel behaviour, we try again with the real test function, 

1232 # starting from the new novel prefix that has discovered. 

1233 trial_data = self.new_conjecture_data(prefix, max_choices=max_length) 

1234 try: 

1235 self.tree.simulate_test_function(trial_data) 

1236 continue 

1237 except PreviouslyUnseenBehaviour: 

1238 pass 

1239 

1240 # If the simulation entered part of the tree that has been killed, 

1241 # we don't want to run this. 

1242 assert isinstance(trial_data.observer, TreeRecordingObserver) 

1243 if trial_data.observer.killed: 

1244 continue 

1245 

1246 # We might have hit the cap on number of examples we should 

1247 # run when calculating the minimal example. 

1248 if not self.should_generate_more(): 

1249 break 

1250 

1251 prefix = trial_data.choices 

1252 else: 

1253 max_length = None 

1254 

1255 data = self.new_conjecture_data(prefix, max_choices=max_length) 

1256 self.test_function(data) 

1257 

1258 if ( 

1259 data.status is Status.OVERRUN 

1260 and max_length is not None 

1261 and "invalid because" not in data.events 

1262 ): 

1263 data.events["invalid because"] = ( 

1264 "reduced max size for early examples (avoids flaky health checks)" 

1265 ) 

1266 

1267 self.generate_mutations_from(data) 

1268 

1269 # Although the optimisations are logically a distinct phase, we 

1270 # actually normally run them as part of example generation. The 

1271 # reason for this is that we cannot guarantee that optimisation 

1272 # actually exhausts our budget: It might finish running and we 

1273 # discover that actually we still could run a bunch more test cases 

1274 # if we want. 

1275 if ( 

1276 self.valid_examples >= max(small_example_cap, optimise_at) 

1277 and not ran_optimisations 

1278 ): 

1279 ran_optimisations = True 

1280 self._current_phase = "target" 

1281 self.optimise_targets() 

1282 

1283 def generate_mutations_from( 

1284 self, data: Union[ConjectureData, ConjectureResult] 

1285 ) -> None: 

1286 # A thing that is often useful but rarely happens by accident is 

1287 # to generate the same value at multiple different points in the 

1288 # test case. 

1289 # 

1290 # Rather than make this the responsibility of individual strategies 

1291 # we implement a small mutator that just takes parts of the test 

1292 # case with the same label and tries replacing one of them with a 

1293 # copy of the other and tries running it. If we've made a good 

1294 # guess about what to put where, this will run a similar generated 

1295 # test case with more duplication. 

1296 if ( 

1297 # An OVERRUN doesn't have enough information about the test 

1298 # case to mutate, so we just skip those. 

1299 data.status >= Status.INVALID 

1300 # This has a tendency to trigger some weird edge cases during 

1301 # generation so we don't let it run until we're done with the 

1302 # health checks. 

1303 and self.health_check_state is None 

1304 ): 

1305 initial_calls = self.call_count 

1306 failed_mutations = 0 

1307 

1308 while ( 

1309 self.should_generate_more() 

1310 # We implement fairly conservative checks for how long we 

1311 # we should run mutation for, as it's generally not obvious 

1312 # how helpful it is for any given test case. 

1313 and self.call_count <= initial_calls + 5 

1314 and failed_mutations <= 5 

1315 ): 

1316 groups = data.spans.mutator_groups 

1317 if not groups: 

1318 break 

1319 

1320 group = self.random.choice(groups) 

1321 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2) 

1322 if start1 > start2: 

1323 (start1, end1), (start2, end2) = (start2, end2), (start1, end1) 

1324 

1325 if ( 

1326 start1 <= start2 <= end2 <= end1 

1327 ): # pragma: no cover # flaky on conjecture-cover tests 

1328 # One span entirely contains the other. The strategy is very 

1329 # likely some kind of tree. e.g. we might have 

1330 # 

1331 # ┌─────┐ 

1332 # ┌─────┤ a ├──────┐ 

1333 # │ └─────┘ │ 

1334 # ┌──┴──┐ ┌──┴──┐ 

1335 # ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1336 # │ └──┬──┘ │ │ └──┬──┘ │ 

1337 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1338 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1339 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1340 # 

1341 # where each node is drawn from the same strategy and so 

1342 # has the same span label. We might have selected the spans 

1343 # corresponding to the a and c nodes, which is the entire 

1344 # tree and the subtree of (and including) c respectively. 

1345 # 

1346 # There are two possible mutations we could apply in this case: 

1347 # 1. replace a with c (replace child with parent) 

1348 # 2. replace c with a (replace parent with child) 

1349 # 

1350 # (1) results in multiple partial copies of the 

1351 # parent: 

1352 # ┌─────┐ 

1353 # ┌─────┤ a ├────────────┐ 

1354 # │ └─────┘ │ 

1355 # ┌──┴──┐ ┌─┴───┐ 

1356 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐ 

1357 # │ └──┬──┘ │ │ └─────┘ │ 

1358 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐ 

1359 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1360 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │ 

1361 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1362 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1363 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1364 # 

1365 # While (2) results in truncating part of the parent: 

1366 # 

1367 # ┌─────┐ 

1368 # ┌──┤ c ├──┐ 

1369 # │ └──┬──┘ │ 

1370 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1371 # │ g │ │ h │ │ i │ 

1372 # └───┘ └───┘ └───┘ 

1373 # 

1374 # (1) is the same as Example IV.4. in Nautilus (NDSS '19) 

1375 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf), 

1376 # except we do not repeat the replacement additional times 

1377 # (the paper repeats it once for a total of two copies). 

1378 # 

1379 # We currently only apply mutation (1), and ignore mutation 

1380 # (2). The reason is that the attempt generated from (2) is 

1381 # always something that Hypothesis could easily have generated 

1382 # itself, by simply not making various choices. Whereas 

1383 # duplicating the exact value + structure of particular choices 

1384 # in (1) would have been hard for Hypothesis to generate by 

1385 # chance. 

1386 # 

1387 # TODO: an extension of this mutation might repeat (1) on 

1388 # a geometric distribution between 0 and ~10 times. We would 

1389 # need to find the corresponding span to recurse on in the new 

1390 # choices, probably just by using the choices index. 

1391 

1392 # case (1): duplicate the choices in start1:start2. 

1393 attempt = data.choices[:start2] + data.choices[start1:] 

1394 else: 

1395 (start, end) = self.random.choice([(start1, end1), (start2, end2)]) 

1396 replacement = data.choices[start:end] 

1397 # We attempt to replace both the examples with 

1398 # whichever choice we made. Note that this might end 

1399 # up messing up and getting the example boundaries 

1400 # wrong - labels matching are only a best guess as to 

1401 # whether the two are equivalent - but it doesn't 

1402 # really matter. It may not achieve the desired result, 

1403 # but it's still a perfectly acceptable choice sequence 

1404 # to try. 

1405 attempt = ( 

1406 data.choices[:start1] 

1407 + replacement 

1408 + data.choices[end1:start2] 

1409 + replacement 

1410 + data.choices[end2:] 

1411 ) 

1412 

1413 try: 

1414 new_data = self.cached_test_function( 

1415 attempt, 

1416 # We set error_on_discard so that we don't end up 

1417 # entering parts of the tree we consider redundant 

1418 # and not worth exploring. 

1419 error_on_discard=True, 

1420 ) 

1421 except ContainsDiscard: 

1422 failed_mutations += 1 

1423 continue 

1424 

1425 if new_data is Overrun: 

1426 failed_mutations += 1 # pragma: no cover # annoying case 

1427 else: 

1428 assert isinstance(new_data, ConjectureResult) 

1429 if ( 

1430 new_data.status >= data.status 

1431 and choices_key(data.choices) != choices_key(new_data.choices) 

1432 and all( 

1433 k in new_data.target_observations 

1434 and new_data.target_observations[k] >= v 

1435 for k, v in data.target_observations.items() 

1436 ) 

1437 ): 

1438 data = new_data 

1439 failed_mutations = 0 

1440 else: 

1441 failed_mutations += 1 

1442 

1443 def optimise_targets(self) -> None: 

1444 """If any target observations have been made, attempt to optimise them 

1445 all.""" 

1446 if not self.should_optimise: 

1447 return 

1448 from hypothesis.internal.conjecture.optimiser import Optimiser 

1449 

1450 # We want to avoid running the optimiser for too long in case we hit 

1451 # an unbounded target score. We start this off fairly conservatively 

1452 # in case interesting examples are easy to find and then ramp it up 

1453 # on an exponential schedule so we don't hamper the optimiser too much 

1454 # if it needs a long time to find good enough improvements. 

1455 max_improvements = 10 

1456 while True: 

1457 prev_calls = self.call_count 

1458 

1459 any_improvements = False 

1460 

1461 for target, data in list(self.best_examples_of_observed_targets.items()): 

1462 optimiser = Optimiser( 

1463 self, data, target, max_improvements=max_improvements 

1464 ) 

1465 optimiser.run() 

1466 if optimiser.improvements > 0: 

1467 any_improvements = True 

1468 

1469 if self.interesting_examples: 

1470 break 

1471 

1472 max_improvements *= 2 

1473 

1474 if any_improvements: 

1475 continue 

1476 

1477 if self.best_observed_targets: 

1478 self.pareto_optimise() 

1479 

1480 if prev_calls == self.call_count: 

1481 break 

1482 

1483 def pareto_optimise(self) -> None: 

1484 if self.pareto_front is not None: 

1485 ParetoOptimiser(self).run() 

1486 

1487 def _run(self) -> None: 

1488 # have to use the primitive provider to interpret database bits... 

1489 self._switch_to_hypothesis_provider = True 

1490 with self._log_phase_statistics("reuse"): 

1491 self.reuse_existing_examples() 

1492 # Fast path for development: If the database gave us interesting 

1493 # examples from the previously stored primary key, don't try 

1494 # shrinking it again as it's unlikely to work. 

1495 if self.reused_previously_shrunk_test_case: 

1496 self.exit_with(ExitReason.finished) 

1497 # ...but we should use the supplied provider when generating... 

1498 self._switch_to_hypothesis_provider = False 

1499 with self._log_phase_statistics("generate"): 

1500 self.generate_new_examples() 

1501 # We normally run the targeting phase mixed in with the generate phase, 

1502 # but if we've been asked to run it but not generation then we have to 

1503 # run it explicitly on its own here. 

1504 if Phase.generate not in self.settings.phases: 

1505 self._current_phase = "target" 

1506 self.optimise_targets() 

1507 # ...and back to the primitive provider when shrinking. 

1508 self._switch_to_hypothesis_provider = True 

1509 with self._log_phase_statistics("shrink"): 

1510 self.shrink_interesting_examples() 

1511 self.exit_with(ExitReason.finished) 

1512 

1513 def new_conjecture_data( 

1514 self, 

1515 prefix: Sequence[Union[ChoiceT, ChoiceTemplate]], 

1516 *, 

1517 observer: Optional[DataObserver] = None, 

1518 max_choices: Optional[int] = None, 

1519 ) -> ConjectureData: 

1520 provider = ( 

1521 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider 

1522 ) 

1523 observer = observer or self.tree.new_observer() 

1524 if not self.using_hypothesis_backend: 

1525 observer = DataObserver() 

1526 

1527 return ConjectureData( 

1528 prefix=prefix, 

1529 observer=observer, 

1530 provider=provider, 

1531 max_choices=max_choices, 

1532 random=self.random, 

1533 ) 

1534 

1535 def shrink_interesting_examples(self) -> None: 

1536 """If we've found interesting examples, try to replace each of them 

1537 with a minimal interesting example with the same interesting_origin. 

1538 

1539 We may find one or more examples with a new interesting_origin 

1540 during the shrink process. If so we shrink these too. 

1541 """ 

1542 if Phase.shrink not in self.settings.phases or not self.interesting_examples: 

1543 return 

1544 

1545 self.debug("Shrinking interesting examples") 

1546 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS 

1547 

1548 for prev_data in sorted( 

1549 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes) 

1550 ): 

1551 assert prev_data.status == Status.INTERESTING 

1552 data = self.new_conjecture_data(prev_data.choices) 

1553 self.test_function(data) 

1554 if data.status != Status.INTERESTING: 

1555 self.exit_with(ExitReason.flaky) 

1556 

1557 self.clear_secondary_key() 

1558 

1559 while len(self.shrunk_examples) < len(self.interesting_examples): 

1560 target, example = min( 

1561 ( 

1562 (k, v) 

1563 for k, v in self.interesting_examples.items() 

1564 if k not in self.shrunk_examples 

1565 ), 

1566 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))), 

1567 ) 

1568 self.debug(f"Shrinking {target!r}: {example.choices}") 

1569 

1570 if not self.settings.report_multiple_bugs: 

1571 # If multi-bug reporting is disabled, we shrink our currently-minimal 

1572 # failure, allowing 'slips' to any bug with a smaller minimal example. 

1573 self.shrink(example, lambda d: d.status == Status.INTERESTING) 

1574 return 

1575 

1576 def predicate(d: Union[ConjectureResult, _Overrun]) -> bool: 

1577 if d.status < Status.INTERESTING: 

1578 return False 

1579 d = cast(ConjectureResult, d) 

1580 return d.interesting_origin == target 

1581 

1582 self.shrink(example, predicate) 

1583 

1584 self.shrunk_examples.add(target) 

1585 

1586 def clear_secondary_key(self) -> None: 

1587 if self.has_existing_examples(): 

1588 # If we have any smaller examples in the secondary corpus, now is 

1589 # a good time to try them to see if they work as shrinks. They 

1590 # probably won't, but it's worth a shot and gives us a good 

1591 # opportunity to clear out the database. 

1592 

1593 # It's not worth trying the primary corpus because we already 

1594 # tried all of those in the initial phase. 

1595 corpus = sorted( 

1596 self.settings.database.fetch(self.secondary_key), key=shortlex 

1597 ) 

1598 for c in corpus: 

1599 choices = choices_from_bytes(c) 

1600 if choices is None: 

1601 self.settings.database.delete(self.secondary_key, c) 

1602 continue 

1603 primary = { 

1604 choices_to_bytes(v.choices) 

1605 for v in self.interesting_examples.values() 

1606 } 

1607 if shortlex(c) > max(map(shortlex, primary)): 

1608 break 

1609 

1610 self.cached_test_function(choices) 

1611 # We unconditionally remove c from the secondary key as it 

1612 # is either now primary or worse than our primary example 

1613 # of this reason for interestingness. 

1614 self.settings.database.delete(self.secondary_key, c) 

1615 

1616 def shrink( 

1617 self, 

1618 example: Union[ConjectureData, ConjectureResult], 

1619 predicate: Optional[ShrinkPredicateT] = None, 

1620 allow_transition: Optional[ 

1621 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1622 ] = None, 

1623 ) -> Union[ConjectureData, ConjectureResult]: 

1624 s = self.new_shrinker(example, predicate, allow_transition) 

1625 s.shrink() 

1626 return s.shrink_target 

1627 

1628 def new_shrinker( 

1629 self, 

1630 example: Union[ConjectureData, ConjectureResult], 

1631 predicate: Optional[ShrinkPredicateT] = None, 

1632 allow_transition: Optional[ 

1633 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1634 ] = None, 

1635 ) -> Shrinker: 

1636 return Shrinker( 

1637 self, 

1638 example, 

1639 predicate, 

1640 allow_transition=allow_transition, 

1641 explain=Phase.explain in self.settings.phases, 

1642 in_target_phase=self._current_phase == "target", 

1643 ) 

1644 

1645 def passing_choice_sequences( 

1646 self, prefix: Sequence[ChoiceNode] = () 

1647 ) -> frozenset[tuple[ChoiceNode, ...]]: 

1648 """Return a collection of choice sequence nodes which cause the test to pass. 

1649 Optionally restrict this by a certain prefix, which is useful for explain mode. 

1650 """ 

1651 return frozenset( 

1652 cast(ConjectureResult, result).nodes 

1653 for key in self.__data_cache 

1654 if (result := self.__data_cache[key]).status is Status.VALID 

1655 and startswith(cast(ConjectureResult, result).nodes, prefix) 

1656 ) 

1657 

1658 

1659class ContainsDiscard(Exception): 

1660 pass