Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/hypothesis/internal/conjecture/engine.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

676 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import importlib 

12import inspect 

13import math 

14import textwrap 

15import time 

16from collections import defaultdict 

17from collections.abc import Generator, Sequence 

18from contextlib import contextmanager, suppress 

19from dataclasses import dataclass, field 

20from datetime import timedelta 

21from enum import Enum 

22from random import Random, getrandbits 

23from typing import Callable, Final, List, Literal, NoReturn, Optional, Union, cast 

24 

25from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings 

26from hypothesis._settings import local_settings, note_deprecation 

27from hypothesis.database import ExampleDatabase, choices_from_bytes, choices_to_bytes 

28from hypothesis.errors import ( 

29 BackendCannotProceed, 

30 FlakyReplay, 

31 HypothesisException, 

32 InvalidArgument, 

33 StopTest, 

34) 

35from hypothesis.internal.cache import LRUReusedCache 

36from hypothesis.internal.compat import NotRequired, TypeAlias, TypedDict, ceil, override 

37from hypothesis.internal.conjecture.choice import ( 

38 ChoiceConstraintsT, 

39 ChoiceKeyT, 

40 ChoiceNode, 

41 ChoiceT, 

42 ChoiceTemplate, 

43 choices_key, 

44) 

45from hypothesis.internal.conjecture.data import ( 

46 ConjectureData, 

47 ConjectureResult, 

48 DataObserver, 

49 Overrun, 

50 Status, 

51 _Overrun, 

52) 

53from hypothesis.internal.conjecture.datatree import ( 

54 DataTree, 

55 PreviouslyUnseenBehaviour, 

56 TreeRecordingObserver, 

57) 

58from hypothesis.internal.conjecture.junkdrawer import ( 

59 ensure_free_stackframes, 

60 startswith, 

61) 

62from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser 

63from hypothesis.internal.conjecture.providers import ( 

64 AVAILABLE_PROVIDERS, 

65 HypothesisProvider, 

66 PrimitiveProvider, 

67) 

68from hypothesis.internal.conjecture.shrinker import Shrinker, ShrinkPredicateT, sort_key 

69from hypothesis.internal.escalation import InterestingOrigin 

70from hypothesis.internal.healthcheck import fail_health_check 

71from hypothesis.reporting import base_report, report 

72 

73MAX_SHRINKS: Final[int] = 500 

74CACHE_SIZE: Final[int] = 10000 

75MUTATION_POOL_SIZE: Final[int] = 100 

76MIN_TEST_CALLS: Final[int] = 10 

77BUFFER_SIZE: Final[int] = 8 * 1024 

78 

79# If the shrinking phase takes more than five minutes, abort it early and print 

80# a warning. Many CI systems will kill a build after around ten minutes with 

81# no output, and appearing to hang isn't great for interactive use either - 

82# showing partially-shrunk examples is better than quitting with no examples! 

83# (but make it monkeypatchable, for the rare users who need to keep on shrinking) 

84MAX_SHRINKING_SECONDS: Final[int] = 300 

85 

86Ls: TypeAlias = list["Ls | int"] 

87 

88 

89def shortlex(s): 

90 return (len(s), s) 

91 

92 

93@dataclass 

94class HealthCheckState: 

95 valid_examples: int = field(default=0) 

96 invalid_examples: int = field(default=0) 

97 overrun_examples: int = field(default=0) 

98 draw_times: "defaultdict[str, List[float]]" = field( 

99 default_factory=lambda: defaultdict(list) 

100 ) 

101 

102 @property 

103 def total_draw_time(self) -> float: 

104 return math.fsum(sum(self.draw_times.values(), start=[])) 

105 

106 def timing_report(self) -> str: 

107 """Return a terminal report describing what was slow.""" 

108 if not self.draw_times: 

109 return "" 

110 width = max( 

111 len(k.removeprefix("generate:").removesuffix(": ")) for k in self.draw_times 

112 ) 

113 out = [f"\n {'':^{width}} count | fraction | slowest draws (seconds)"] 

114 args_in_order = sorted(self.draw_times.items(), key=lambda kv: -sum(kv[1])) 

115 for i, (argname, times) in enumerate(args_in_order): # pragma: no branch 

116 # If we have very many unique keys, which can happen due to interactive 

117 # draws with computed labels, we'll skip uninformative rows. 

118 if ( 

119 5 <= i < (len(self.draw_times) - 2) 

120 and math.fsum(times) * 20 < self.total_draw_time 

121 ): 

122 out.append(f" (skipped {len(self.draw_times) - i} rows of fast draws)") 

123 break 

124 # Compute the row to report, omitting times <1ms to focus on slow draws 

125 reprs = [f"{t:>6.3f}," for t in sorted(times)[-5:] if t > 5e-4] 

126 desc = " ".join(([" -- "] * 5 + reprs)[-5:]).rstrip(",") 

127 arg = argname.removeprefix("generate:").removesuffix(": ") 

128 out.append( 

129 f" {arg:^{width}} | {len(times):>4} | " 

130 f"{math.fsum(times)/self.total_draw_time:>7.0%} | {desc}" 

131 ) 

132 return "\n".join(out) 

133 

134 

135class ExitReason(Enum): 

136 max_examples = "settings.max_examples={s.max_examples}" 

137 max_iterations = ( 

138 "settings.max_examples={s.max_examples}, " 

139 "but < 10% of examples satisfied assumptions" 

140 ) 

141 max_shrinks = f"shrunk example {MAX_SHRINKS} times" 

142 finished = "nothing left to do" 

143 flaky = "test was flaky" 

144 very_slow_shrinking = "shrinking was very slow" 

145 

146 def describe(self, settings: Settings) -> str: 

147 return self.value.format(s=settings) 

148 

149 

150class RunIsComplete(Exception): 

151 pass 

152 

153 

154def _get_provider(backend: str) -> Union[type, PrimitiveProvider]: 

155 mname, cname = AVAILABLE_PROVIDERS[backend].rsplit(".", 1) 

156 provider_cls = getattr(importlib.import_module(mname), cname) 

157 if provider_cls.lifetime == "test_function": 

158 return provider_cls(None) 

159 elif provider_cls.lifetime == "test_case": 

160 return provider_cls 

161 else: 

162 raise InvalidArgument( 

163 f"invalid lifetime {provider_cls.lifetime} for provider {provider_cls.__name__}. " 

164 "Expected one of 'test_function', 'test_case'." 

165 ) 

166 

167 

168class CallStats(TypedDict): 

169 status: str 

170 runtime: float 

171 drawtime: float 

172 gctime: float 

173 events: list[str] 

174 

175 

176PhaseStatistics = TypedDict( 

177 "PhaseStatistics", 

178 { 

179 "duration-seconds": float, 

180 "test-cases": list[CallStats], 

181 "distinct-failures": int, 

182 "shrinks-successful": int, 

183 }, 

184) 

185StatisticsDict = TypedDict( 

186 "StatisticsDict", 

187 { 

188 "generate-phase": NotRequired[PhaseStatistics], 

189 "reuse-phase": NotRequired[PhaseStatistics], 

190 "shrink-phase": NotRequired[PhaseStatistics], 

191 "stopped-because": NotRequired[str], 

192 "targets": NotRequired[dict[str, float]], 

193 "nodeid": NotRequired[str], 

194 }, 

195) 

196 

197 

198def choice_count(choices: Sequence[Union[ChoiceT, ChoiceTemplate]]) -> Optional[int]: 

199 count = 0 

200 for choice in choices: 

201 if isinstance(choice, ChoiceTemplate): 

202 if choice.count is None: 

203 return None 

204 count += choice.count 

205 else: 

206 count += 1 

207 return count 

208 

209 

210class DiscardObserver(DataObserver): 

211 @override 

212 def kill_branch(self) -> NoReturn: 

213 raise ContainsDiscard 

214 

215 

216def realize_choices(data: ConjectureData, *, for_failure: bool) -> None: 

217 # backwards-compatibility with backends without for_failure, can remove 

218 # in a few months 

219 kwargs = {} 

220 if for_failure: 

221 if "for_failure" in inspect.signature(data.provider.realize).parameters: 

222 kwargs["for_failure"] = True 

223 else: 

224 note_deprecation( 

225 f"{type(data.provider).__qualname__}.realize does not have the " 

226 "for_failure parameter. This will be an error in future versions " 

227 "of Hypothesis. (If you installed this backend from a separate " 

228 "package, upgrading that package may help).", 

229 has_codemod=False, 

230 since="2025-05-07", 

231 ) 

232 

233 for node in data.nodes: 

234 value = data.provider.realize(node.value, **kwargs) 

235 expected_type = { 

236 "string": str, 

237 "float": float, 

238 "integer": int, 

239 "boolean": bool, 

240 "bytes": bytes, 

241 }[node.type] 

242 if type(value) is not expected_type: 

243 raise HypothesisException( 

244 f"expected {expected_type} from " 

245 f"{data.provider.realize.__qualname__}, got {type(value)}" 

246 ) 

247 

248 constraints = cast( 

249 ChoiceConstraintsT, 

250 { 

251 k: data.provider.realize(v, **kwargs) 

252 for k, v in node.constraints.items() 

253 }, 

254 ) 

255 node.value = value 

256 node.constraints = constraints 

257 

258 

259class ConjectureRunner: 

260 def __init__( 

261 self, 

262 test_function: Callable[[ConjectureData], None], 

263 *, 

264 settings: Optional[Settings] = None, 

265 random: Optional[Random] = None, 

266 database_key: Optional[bytes] = None, 

267 ignore_limits: bool = False, 

268 ) -> None: 

269 self._test_function: Callable[[ConjectureData], None] = test_function 

270 self.settings: Settings = settings or Settings() 

271 self.shrinks: int = 0 

272 self.finish_shrinking_deadline: Optional[float] = None 

273 self.call_count: int = 0 

274 self.misaligned_count: int = 0 

275 self.valid_examples: int = 0 

276 self.invalid_examples: int = 0 

277 self.overrun_examples: int = 0 

278 self.random: Random = random or Random(getrandbits(128)) 

279 self.database_key: Optional[bytes] = database_key 

280 self.ignore_limits: bool = ignore_limits 

281 

282 # Global dict of per-phase statistics, and a list of per-call stats 

283 # which transfer to the global dict at the end of each phase. 

284 self._current_phase: str = "(not a phase)" 

285 self.statistics: StatisticsDict = {} 

286 self.stats_per_test_case: list[CallStats] = [] 

287 

288 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests. 

289 self.interesting_examples: dict[ 

290 Optional[InterestingOrigin], ConjectureResult 

291 ] = {} 

292 # We use call_count because there may be few possible valid_examples. 

293 self.first_bug_found_at: Optional[int] = None 

294 self.last_bug_found_at: Optional[int] = None 

295 

296 # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests. 

297 self.shrunk_examples: set[Optional[InterestingOrigin]] = set() 

298 

299 self.health_check_state: Optional[HealthCheckState] = None 

300 

301 self.tree: DataTree = DataTree() 

302 

303 self.provider: Union[type, PrimitiveProvider] = _get_provider( 

304 self.settings.backend 

305 ) 

306 

307 self.best_observed_targets: defaultdict[str, float] = defaultdict( 

308 lambda: NO_SCORE 

309 ) 

310 self.best_examples_of_observed_targets: dict[str, ConjectureResult] = {} 

311 

312 # We keep the pareto front in the example database if we have one. This 

313 # is only marginally useful at present, but speeds up local development 

314 # because it means that large targets will be quickly surfaced in your 

315 # testing. 

316 self.pareto_front: Optional[ParetoFront] = None 

317 if self.database_key is not None and self.settings.database is not None: 

318 self.pareto_front = ParetoFront(self.random) 

319 self.pareto_front.on_evict(self.on_pareto_evict) 

320 

321 # We want to be able to get the ConjectureData object that results 

322 # from running a buffer without recalculating, especially during 

323 # shrinking where we need to know about the structure of the 

324 # executed test case. 

325 self.__data_cache = LRUReusedCache[ 

326 tuple[ChoiceKeyT, ...], Union[ConjectureResult, _Overrun] 

327 ](CACHE_SIZE) 

328 

329 self.reused_previously_shrunk_test_case: bool = False 

330 

331 self.__pending_call_explanation: Optional[str] = None 

332 self._backend_found_failure: bool = False 

333 self._switch_to_hypothesis_provider: bool = False 

334 

335 self.__failed_realize_count: int = 0 

336 # note unsound verification by alt backends 

337 self._verified_by: Optional[str] = None 

338 

339 @property 

340 def using_hypothesis_backend(self) -> bool: 

341 return ( 

342 self.settings.backend == "hypothesis" or self._switch_to_hypothesis_provider 

343 ) 

344 

345 def explain_next_call_as(self, explanation: str) -> None: 

346 self.__pending_call_explanation = explanation 

347 

348 def clear_call_explanation(self) -> None: 

349 self.__pending_call_explanation = None 

350 

351 @contextmanager 

352 def _log_phase_statistics( 

353 self, phase: Literal["reuse", "generate", "shrink"] 

354 ) -> Generator[None, None, None]: 

355 self.stats_per_test_case.clear() 

356 start_time = time.perf_counter() 

357 try: 

358 self._current_phase = phase 

359 yield 

360 finally: 

361 # We ignore the mypy type error here. Because `phase` is a string literal and "-phase" is a string literal 

362 # as well, the concatenation will always be valid key in the dictionary. 

363 self.statistics[phase + "-phase"] = { # type: ignore 

364 "duration-seconds": time.perf_counter() - start_time, 

365 "test-cases": list(self.stats_per_test_case), 

366 "distinct-failures": len(self.interesting_examples), 

367 "shrinks-successful": self.shrinks, 

368 } 

369 

370 @property 

371 def should_optimise(self) -> bool: 

372 return Phase.target in self.settings.phases 

373 

374 def __tree_is_exhausted(self) -> bool: 

375 return self.tree.is_exhausted and self.using_hypothesis_backend 

376 

377 def __stoppable_test_function(self, data: ConjectureData) -> None: 

378 """Run ``self._test_function``, but convert a ``StopTest`` exception 

379 into a normal return and avoid raising anything flaky for RecursionErrors. 

380 """ 

381 # We ensure that the test has this much stack space remaining, no 

382 # matter the size of the stack when called, to de-flake RecursionErrors 

383 # (#2494, #3671). Note, this covers the data generation part of the test; 

384 # the actual test execution is additionally protected at the call site 

385 # in hypothesis.core.execute_once. 

386 with ensure_free_stackframes(): 

387 try: 

388 self._test_function(data) 

389 except StopTest as e: 

390 if e.testcounter == data.testcounter: 

391 # This StopTest has successfully stopped its test, and can now 

392 # be discarded. 

393 pass 

394 else: 

395 # This StopTest was raised by a different ConjectureData. We 

396 # need to re-raise it so that it will eventually reach the 

397 # correct engine. 

398 raise 

399 

400 def _cache_key(self, choices: Sequence[ChoiceT]) -> tuple[ChoiceKeyT, ...]: 

401 return choices_key(choices) 

402 

403 def _cache(self, data: ConjectureData) -> None: 

404 result = data.as_result() 

405 key = self._cache_key(data.choices) 

406 self.__data_cache[key] = result 

407 

408 def cached_test_function( 

409 self, 

410 choices: Sequence[Union[ChoiceT, ChoiceTemplate]], 

411 *, 

412 error_on_discard: bool = False, 

413 extend: Union[int, Literal["full"]] = 0, 

414 ) -> Union[ConjectureResult, _Overrun]: 

415 """ 

416 If ``error_on_discard`` is set to True this will raise ``ContainsDiscard`` 

417 in preference to running the actual test function. This is to allow us 

418 to skip test cases we expect to be redundant in some cases. Note that 

419 it may be the case that we don't raise ``ContainsDiscard`` even if the 

420 result has discards if we cannot determine from previous runs whether 

421 it will have a discard. 

422 """ 

423 # node templates represent a not-yet-filled hole and therefore cannot 

424 # be cached or retrieved from the cache. 

425 if not any(isinstance(choice, ChoiceTemplate) for choice in choices): 

426 # this type cast is validated by the isinstance check above (ie, there 

427 # are no ChoiceTemplate elements). 

428 choices = cast(Sequence[ChoiceT], choices) 

429 key = self._cache_key(choices) 

430 try: 

431 cached = self.__data_cache[key] 

432 # if we have a cached overrun for this key, but we're allowing extensions 

433 # of the nodes, it could in fact run to a valid data if we try. 

434 if extend == 0 or cached.status is not Status.OVERRUN: 

435 return cached 

436 except KeyError: 

437 pass 

438 

439 if extend == "full": 

440 max_length = None 

441 elif (count := choice_count(choices)) is None: 

442 max_length = None 

443 else: 

444 max_length = count + extend 

445 

446 # explicitly use a no-op DataObserver here instead of a TreeRecordingObserver. 

447 # The reason is we don't expect simulate_test_function to explore new choices 

448 # and write back to the tree, so we don't want the overhead of the 

449 # TreeRecordingObserver tracking those calls. 

450 trial_observer: Optional[DataObserver] = DataObserver() 

451 if error_on_discard: 

452 trial_observer = DiscardObserver() 

453 

454 try: 

455 trial_data = self.new_conjecture_data( 

456 choices, observer=trial_observer, max_choices=max_length 

457 ) 

458 self.tree.simulate_test_function(trial_data) 

459 except PreviouslyUnseenBehaviour: 

460 pass 

461 else: 

462 trial_data.freeze() 

463 key = self._cache_key(trial_data.choices) 

464 if trial_data.status > Status.OVERRUN: 

465 try: 

466 return self.__data_cache[key] 

467 except KeyError: 

468 pass 

469 else: 

470 # if we simulated to an overrun, then we our result is certainly 

471 # an overrun; no need to consult the cache. (and we store this result 

472 # for simulation-less lookup later). 

473 self.__data_cache[key] = Overrun 

474 return Overrun 

475 try: 

476 return self.__data_cache[key] 

477 except KeyError: 

478 pass 

479 

480 data = self.new_conjecture_data(choices, max_choices=max_length) 

481 # note that calling test_function caches `data` for us, for both an ir 

482 # tree key and a buffer key. 

483 self.test_function(data) 

484 return data.as_result() 

485 

486 def test_function(self, data: ConjectureData) -> None: 

487 if self.__pending_call_explanation is not None: 

488 self.debug(self.__pending_call_explanation) 

489 self.__pending_call_explanation = None 

490 

491 self.call_count += 1 

492 interrupted = False 

493 

494 try: 

495 self.__stoppable_test_function(data) 

496 except KeyboardInterrupt: 

497 interrupted = True 

498 raise 

499 except BackendCannotProceed as exc: 

500 if exc.scope in ("verified", "exhausted"): 

501 self._switch_to_hypothesis_provider = True 

502 if exc.scope == "verified": 

503 self._verified_by = self.settings.backend 

504 elif exc.scope == "discard_test_case": 

505 self.__failed_realize_count += 1 

506 if ( 

507 self.__failed_realize_count > 10 

508 and (self.__failed_realize_count / self.call_count) > 0.2 

509 ): 

510 self._switch_to_hypothesis_provider = True 

511 

512 # treat all BackendCannotProceed exceptions as invalid. This isn't 

513 # great; "verified" should really be counted as self.valid_examples += 1. 

514 # But we check self.valid_examples == 0 to determine whether to raise 

515 # Unsatisfiable, and that would throw this check off. 

516 self.invalid_examples += 1 

517 

518 # skip the post-test-case tracking; we're pretending this never happened 

519 interrupted = True 

520 data.cannot_proceed_scope = exc.scope 

521 data.freeze() 

522 return 

523 except BaseException: 

524 data.freeze() 

525 if self.settings.backend != "hypothesis": 

526 realize_choices(data, for_failure=True) 

527 self.save_choices(data.choices) 

528 raise 

529 finally: 

530 # No branch, because if we're interrupted we always raise 

531 # the KeyboardInterrupt, never continue to the code below. 

532 if not interrupted: # pragma: no branch 

533 assert data.cannot_proceed_scope is None 

534 data.freeze() 

535 call_stats: CallStats = { 

536 "status": data.status.name.lower(), 

537 "runtime": data.finish_time - data.start_time, 

538 "drawtime": math.fsum(data.draw_times.values()), 

539 "gctime": data.gc_finish_time - data.gc_start_time, 

540 "events": sorted( 

541 k if v == "" else f"{k}: {v}" for k, v in data.events.items() 

542 ), 

543 } 

544 self.stats_per_test_case.append(call_stats) 

545 if self.settings.backend != "hypothesis": 

546 realize_choices(data, for_failure=data.status is Status.INTERESTING) 

547 

548 self._cache(data) 

549 if data.misaligned_at is not None: # pragma: no branch # coverage bug? 

550 self.misaligned_count += 1 

551 

552 self.debug_data(data) 

553 

554 if ( 

555 data.target_observations 

556 and self.pareto_front is not None 

557 and self.pareto_front.add(data.as_result()) 

558 ): 

559 self.save_choices(data.choices, sub_key=b"pareto") 

560 

561 if data.status >= Status.VALID: 

562 for k, v in data.target_observations.items(): 

563 self.best_observed_targets[k] = max(self.best_observed_targets[k], v) 

564 

565 if k not in self.best_examples_of_observed_targets: 

566 data_as_result = data.as_result() 

567 assert not isinstance(data_as_result, _Overrun) 

568 self.best_examples_of_observed_targets[k] = data_as_result 

569 continue 

570 

571 existing_example = self.best_examples_of_observed_targets[k] 

572 existing_score = existing_example.target_observations[k] 

573 

574 if v < existing_score: 

575 continue 

576 

577 if v > existing_score or sort_key(data.nodes) < sort_key( 

578 existing_example.nodes 

579 ): 

580 data_as_result = data.as_result() 

581 assert not isinstance(data_as_result, _Overrun) 

582 self.best_examples_of_observed_targets[k] = data_as_result 

583 

584 if data.status is Status.VALID: 

585 self.valid_examples += 1 

586 if data.status is Status.INVALID: 

587 self.invalid_examples += 1 

588 if data.status is Status.OVERRUN: 

589 self.overrun_examples += 1 

590 

591 if data.status == Status.INTERESTING: 

592 if not self.using_hypothesis_backend: 

593 # replay this failure on the hypothesis backend to ensure it still 

594 # finds a failure. otherwise, it is flaky. 

595 initial_origin = data.interesting_origin 

596 initial_traceback = data.expected_traceback 

597 data = ConjectureData.for_choices(data.choices) 

598 self.__stoppable_test_function(data) 

599 data.freeze() 

600 # TODO: Convert to FlakyFailure on the way out. Should same-origin 

601 # also be checked? 

602 if data.status != Status.INTERESTING: 

603 desc_new_status = { 

604 data.status.VALID: "passed", 

605 data.status.INVALID: "failed filters", 

606 data.status.OVERRUN: "overran", 

607 }[data.status] 

608 wrapped_tb = ( 

609 "" 

610 if initial_traceback is None 

611 else textwrap.indent(initial_traceback, " | ") 

612 ) 

613 raise FlakyReplay( 

614 f"Inconsistent results from replaying a failing test case!\n" 

615 f"{wrapped_tb}on backend={self.settings.backend!r} but " 

616 f"{desc_new_status} under backend='hypothesis'", 

617 interesting_origins=[initial_origin], 

618 ) 

619 

620 self._cache(data) 

621 

622 key = data.interesting_origin 

623 changed = False 

624 try: 

625 existing = self.interesting_examples[key] 

626 except KeyError: 

627 changed = True 

628 self.last_bug_found_at = self.call_count 

629 if self.first_bug_found_at is None: 

630 self.first_bug_found_at = self.call_count 

631 else: 

632 if sort_key(data.nodes) < sort_key(existing.nodes): 

633 self.shrinks += 1 

634 self.downgrade_choices(existing.choices) 

635 self.__data_cache.unpin(self._cache_key(existing.choices)) 

636 changed = True 

637 

638 if changed: 

639 self.save_choices(data.choices) 

640 self.interesting_examples[key] = data.as_result() # type: ignore 

641 if not self.using_hypothesis_backend: 

642 self._backend_found_failure = True 

643 self.__data_cache.pin(self._cache_key(data.choices), data.as_result()) 

644 self.shrunk_examples.discard(key) 

645 

646 if self.shrinks >= MAX_SHRINKS: 

647 self.exit_with(ExitReason.max_shrinks) 

648 

649 if ( 

650 not self.ignore_limits 

651 and self.finish_shrinking_deadline is not None 

652 and self.finish_shrinking_deadline < time.perf_counter() 

653 ): 

654 # See https://github.com/HypothesisWorks/hypothesis/issues/2340 

655 report( 

656 "WARNING: Hypothesis has spent more than five minutes working to shrink" 

657 " a failing example, and stopped because it is making very slow" 

658 " progress. When you re-run your tests, shrinking will resume and may" 

659 " take this long before aborting again.\nPLEASE REPORT THIS if you can" 

660 " provide a reproducing example, so that we can improve shrinking" 

661 " performance for everyone." 

662 ) 

663 self.exit_with(ExitReason.very_slow_shrinking) 

664 

665 if not self.interesting_examples: 

666 # Note that this logic is reproduced to end the generation phase when 

667 # we have interesting examples. Update that too if you change this! 

668 # (The doubled implementation is because here we exit the engine entirely, 

669 # while in the other case below we just want to move on to shrinking.) 

670 if self.valid_examples >= self.settings.max_examples: 

671 self.exit_with(ExitReason.max_examples) 

672 if self.call_count >= max( 

673 self.settings.max_examples * 10, 

674 # We have a high-ish default max iterations, so that tests 

675 # don't become flaky when max_examples is too low. 

676 1000, 

677 ): 

678 self.exit_with(ExitReason.max_iterations) 

679 

680 if self.__tree_is_exhausted(): 

681 self.exit_with(ExitReason.finished) 

682 

683 self.record_for_health_check(data) 

684 

685 def on_pareto_evict(self, data: ConjectureResult) -> None: 

686 self.settings.database.delete(self.pareto_key, choices_to_bytes(data.choices)) 

687 

688 def generate_novel_prefix(self) -> tuple[ChoiceT, ...]: 

689 """Uses the tree to proactively generate a starting choice sequence 

690 that we haven't explored yet for this test. 

691 

692 When this method is called, we assume that there must be at 

693 least one novel prefix left to find. If there were not, then the 

694 test run should have already stopped due to tree exhaustion. 

695 """ 

696 return self.tree.generate_novel_prefix(self.random) 

697 

698 def record_for_health_check(self, data: ConjectureData) -> None: 

699 # Once we've actually found a bug, there's no point in trying to run 

700 # health checks - they'll just mask the actually important information. 

701 if data.status == Status.INTERESTING: 

702 self.health_check_state = None 

703 

704 state = self.health_check_state 

705 

706 if state is None: 

707 return 

708 

709 for k, v in data.draw_times.items(): 

710 state.draw_times[k].append(v) 

711 

712 if data.status == Status.VALID: 

713 state.valid_examples += 1 

714 elif data.status == Status.INVALID: 

715 state.invalid_examples += 1 

716 else: 

717 assert data.status == Status.OVERRUN 

718 state.overrun_examples += 1 

719 

720 max_valid_draws = 10 

721 max_invalid_draws = 50 

722 max_overrun_draws = 20 

723 

724 assert state.valid_examples <= max_valid_draws 

725 

726 if state.valid_examples == max_valid_draws: 

727 self.health_check_state = None 

728 return 

729 

730 if state.overrun_examples == max_overrun_draws: 

731 fail_health_check( 

732 self.settings, 

733 "Examples routinely exceeded the max allowable size. " 

734 f"({state.overrun_examples} examples overran while generating " 

735 f"{state.valid_examples} valid ones). Generating examples this large " 

736 "will usually lead to bad results. You could try setting max_size " 

737 "parameters on your collections and turning max_leaves down on " 

738 "recursive() calls.", 

739 HealthCheck.data_too_large, 

740 ) 

741 if state.invalid_examples == max_invalid_draws: 

742 fail_health_check( 

743 self.settings, 

744 "It looks like your strategy is filtering out a lot of data. Health " 

745 f"check found {state.invalid_examples} filtered examples but only " 

746 f"{state.valid_examples} good ones. This will make your tests much " 

747 "slower, and also will probably distort the data generation quite a " 

748 "lot. You should adapt your strategy to filter less. This can also " 

749 "be caused by a low max_leaves parameter in recursive() calls", 

750 HealthCheck.filter_too_much, 

751 ) 

752 

753 draw_time = state.total_draw_time 

754 

755 # Allow at least the greater of one second or 5x the deadline. If deadline 

756 # is None, allow 30s - the user can disable the healthcheck too if desired. 

757 draw_time_limit = 5 * (self.settings.deadline or timedelta(seconds=6)) 

758 if draw_time > max(1.0, draw_time_limit.total_seconds()): 

759 fail_health_check( 

760 self.settings, 

761 "Data generation is extremely slow: Only produced " 

762 f"{state.valid_examples} valid examples in {draw_time:.2f} seconds " 

763 f"({state.invalid_examples} invalid ones and {state.overrun_examples} " 

764 "exceeded maximum size). Try decreasing size of the data you're " 

765 "generating (with e.g. max_size or max_leaves parameters)." 

766 + state.timing_report(), 

767 HealthCheck.too_slow, 

768 ) 

769 

770 def save_choices( 

771 self, choices: Sequence[ChoiceT], sub_key: Optional[bytes] = None 

772 ) -> None: 

773 if self.settings.database is not None: 

774 key = self.sub_key(sub_key) 

775 if key is None: 

776 return 

777 self.settings.database.save(key, choices_to_bytes(choices)) 

778 

779 def downgrade_choices(self, choices: Sequence[ChoiceT]) -> None: 

780 buffer = choices_to_bytes(choices) 

781 if self.settings.database is not None and self.database_key is not None: 

782 self.settings.database.move(self.database_key, self.secondary_key, buffer) 

783 

784 def sub_key(self, sub_key: Optional[bytes]) -> Optional[bytes]: 

785 if self.database_key is None: 

786 return None 

787 if sub_key is None: 

788 return self.database_key 

789 return b".".join((self.database_key, sub_key)) 

790 

791 @property 

792 def secondary_key(self) -> Optional[bytes]: 

793 return self.sub_key(b"secondary") 

794 

795 @property 

796 def pareto_key(self) -> Optional[bytes]: 

797 return self.sub_key(b"pareto") 

798 

799 def debug(self, message: str) -> None: 

800 if self.settings.verbosity >= Verbosity.debug: 

801 base_report(message) 

802 

803 @property 

804 def report_debug_info(self) -> bool: 

805 return self.settings.verbosity >= Verbosity.debug 

806 

807 def debug_data(self, data: Union[ConjectureData, ConjectureResult]) -> None: 

808 if not self.report_debug_info: 

809 return 

810 

811 status = repr(data.status) 

812 if data.status == Status.INTERESTING: 

813 status = f"{status} ({data.interesting_origin!r})" 

814 

815 self.debug( 

816 f"{len(data.choices)} choices {data.choices} -> {status}" 

817 f"{', ' + data.output if data.output else ''}" 

818 ) 

819 

820 def run(self) -> None: 

821 with local_settings(self.settings): 

822 try: 

823 self._run() 

824 except RunIsComplete: 

825 pass 

826 for v in self.interesting_examples.values(): 

827 self.debug_data(v) 

828 self.debug( 

829 "Run complete after %d examples (%d valid) and %d shrinks" 

830 % (self.call_count, self.valid_examples, self.shrinks) 

831 ) 

832 

833 @property 

834 def database(self) -> Optional[ExampleDatabase]: 

835 if self.database_key is None: 

836 return None 

837 return self.settings.database 

838 

839 def has_existing_examples(self) -> bool: 

840 return self.database is not None and Phase.reuse in self.settings.phases 

841 

842 def reuse_existing_examples(self) -> None: 

843 """If appropriate (we have a database and have been told to use it), 

844 try to reload existing examples from the database. 

845 

846 If there are a lot we don't try all of them. We always try the 

847 smallest example in the database (which is guaranteed to be the 

848 last failure) and the largest (which is usually the seed example 

849 which the last failure came from but we don't enforce that). We 

850 then take a random sampling of the remainder and try those. Any 

851 examples that are no longer interesting are cleared out. 

852 """ 

853 if self.has_existing_examples(): 

854 self.debug("Reusing examples from database") 

855 # We have to do some careful juggling here. We have two database 

856 # corpora: The primary and secondary. The primary corpus is a 

857 # small set of minimized examples each of which has at one point 

858 # demonstrated a distinct bug. We want to retry all of these. 

859 

860 # We also have a secondary corpus of examples that have at some 

861 # point demonstrated interestingness (currently only ones that 

862 # were previously non-minimal examples of a bug, but this will 

863 # likely expand in future). These are a good source of potentially 

864 # interesting examples, but there are a lot of them, so we down 

865 # sample the secondary corpus to a more manageable size. 

866 

867 corpus = sorted( 

868 self.settings.database.fetch(self.database_key), key=shortlex 

869 ) 

870 factor = 0.1 if (Phase.generate in self.settings.phases) else 1 

871 desired_size = max(2, ceil(factor * self.settings.max_examples)) 

872 primary_corpus_size = len(corpus) 

873 

874 if len(corpus) < desired_size: 

875 extra_corpus = list(self.settings.database.fetch(self.secondary_key)) 

876 

877 shortfall = desired_size - len(corpus) 

878 

879 if len(extra_corpus) <= shortfall: 

880 extra = extra_corpus 

881 else: 

882 extra = self.random.sample(extra_corpus, shortfall) 

883 extra.sort(key=shortlex) 

884 corpus.extend(extra) 

885 

886 # We want a fast path where every primary entry in the database was 

887 # interesting. 

888 found_interesting_in_primary = False 

889 all_interesting_in_primary_were_exact = True 

890 

891 for i, existing in enumerate(corpus): 

892 if i >= primary_corpus_size and found_interesting_in_primary: 

893 break 

894 choices = choices_from_bytes(existing) 

895 if choices is None: 

896 # clear out any keys which fail deserialization 

897 self.settings.database.delete(self.database_key, existing) 

898 continue 

899 data = self.cached_test_function(choices, extend="full") 

900 if data.status != Status.INTERESTING: 

901 self.settings.database.delete(self.database_key, existing) 

902 self.settings.database.delete(self.secondary_key, existing) 

903 else: 

904 if i < primary_corpus_size: 

905 found_interesting_in_primary = True 

906 assert not isinstance(data, _Overrun) 

907 if choices_key(choices) != choices_key(data.choices): 

908 all_interesting_in_primary_were_exact = False 

909 if not self.settings.report_multiple_bugs: 

910 break 

911 if found_interesting_in_primary: 

912 if all_interesting_in_primary_were_exact: 

913 self.reused_previously_shrunk_test_case = True 

914 

915 # Because self.database is not None (because self.has_existing_examples()) 

916 # and self.database_key is not None (because we fetched using it above), 

917 # we can guarantee self.pareto_front is not None 

918 assert self.pareto_front is not None 

919 

920 # If we've not found any interesting examples so far we try some of 

921 # the pareto front from the last run. 

922 if len(corpus) < desired_size and not self.interesting_examples: 

923 desired_extra = desired_size - len(corpus) 

924 pareto_corpus = list(self.settings.database.fetch(self.pareto_key)) 

925 if len(pareto_corpus) > desired_extra: 

926 pareto_corpus = self.random.sample(pareto_corpus, desired_extra) 

927 pareto_corpus.sort(key=shortlex) 

928 

929 for existing in pareto_corpus: 

930 choices = choices_from_bytes(existing) 

931 if choices is None: 

932 self.settings.database.delete(self.pareto_key, existing) 

933 continue 

934 data = self.cached_test_function(choices, extend="full") 

935 if data not in self.pareto_front: 

936 self.settings.database.delete(self.pareto_key, existing) 

937 if data.status == Status.INTERESTING: 

938 break 

939 

940 def exit_with(self, reason: ExitReason) -> None: 

941 if self.ignore_limits: 

942 return 

943 self.statistics["stopped-because"] = reason.describe(self.settings) 

944 if self.best_observed_targets: 

945 self.statistics["targets"] = dict(self.best_observed_targets) 

946 self.debug(f"exit_with({reason.name})") 

947 self.exit_reason = reason 

948 raise RunIsComplete 

949 

950 def should_generate_more(self) -> bool: 

951 # End the generation phase where we would have ended it if no bugs had 

952 # been found. This reproduces the exit logic in `self.test_function`, 

953 # but with the important distinction that this clause will move on to 

954 # the shrinking phase having found one or more bugs, while the other 

955 # will exit having found zero bugs. 

956 if self.valid_examples >= self.settings.max_examples or self.call_count >= max( 

957 self.settings.max_examples * 10, 1000 

958 ): # pragma: no cover 

959 return False 

960 

961 # If we haven't found a bug, keep looking - if we hit any limits on 

962 # the number of tests to run that will raise an exception and stop 

963 # the run. 

964 if not self.interesting_examples: 

965 return True 

966 # Users who disable shrinking probably want to exit as fast as possible. 

967 # If we've found a bug and won't report more than one, stop looking. 

968 elif ( 

969 Phase.shrink not in self.settings.phases 

970 or not self.settings.report_multiple_bugs 

971 ): 

972 return False 

973 assert isinstance(self.first_bug_found_at, int) 

974 assert isinstance(self.last_bug_found_at, int) 

975 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count 

976 # Otherwise, keep searching for between ten and 'a heuristic' calls. 

977 # We cap 'calls after first bug' so errors are reported reasonably 

978 # soon even for tests that are allowed to run for a very long time, 

979 # or sooner if the latest half of our test effort has been fruitless. 

980 return self.call_count < MIN_TEST_CALLS or self.call_count < min( 

981 self.first_bug_found_at + 1000, self.last_bug_found_at * 2 

982 ) 

983 

984 def generate_new_examples(self) -> None: 

985 if Phase.generate not in self.settings.phases: 

986 return 

987 if self.interesting_examples: 

988 # The example database has failing examples from a previous run, 

989 # so we'd rather report that they're still failing ASAP than take 

990 # the time to look for additional failures. 

991 return 

992 

993 self.debug("Generating new examples") 

994 

995 assert self.should_generate_more() 

996 self._switch_to_hypothesis_provider = True 

997 zero_data = self.cached_test_function((ChoiceTemplate("simplest", count=None),)) 

998 if zero_data.status > Status.OVERRUN: 

999 assert isinstance(zero_data, ConjectureResult) 

1000 # if the crosshair backend cannot proceed, it does not (and cannot) 

1001 # realize the symbolic values, with the intent that Hypothesis will 

1002 # throw away this test case. We usually do, but if it's the zero data 

1003 # then we try to pin it here, which requires realizing the symbolics. 

1004 # 

1005 # We don't (yet) rely on the zero data being pinned, and so 

1006 # it's simply a very slight performance loss to simply not pin it 

1007 # if doing so would error. 

1008 if zero_data.cannot_proceed_scope is None: # pragma: no branch 

1009 self.__data_cache.pin( 

1010 self._cache_key(zero_data.choices), zero_data.as_result() 

1011 ) # Pin forever 

1012 

1013 if zero_data.status == Status.OVERRUN or ( 

1014 zero_data.status == Status.VALID 

1015 and isinstance(zero_data, ConjectureResult) 

1016 and zero_data.length * 2 > BUFFER_SIZE 

1017 ): 

1018 fail_health_check( 

1019 self.settings, 

1020 "The smallest natural example for your test is extremely " 

1021 "large. This makes it difficult for Hypothesis to generate " 

1022 "good examples, especially when trying to reduce failing ones " 

1023 "at the end. Consider reducing the size of your data if it is " 

1024 "of a fixed size. You could also fix this by improving how " 

1025 "your data shrinks (see https://hypothesis.readthedocs.io/en/" 

1026 "latest/data.html#shrinking for details), or by introducing " 

1027 "default values inside your strategy. e.g. could you replace " 

1028 "some arguments with their defaults by using " 

1029 "one_of(none(), some_complex_strategy)?", 

1030 HealthCheck.large_base_example, 

1031 ) 

1032 

1033 self.health_check_state = HealthCheckState() 

1034 

1035 # We attempt to use the size of the minimal generated test case starting 

1036 # from a given novel prefix as a guideline to generate smaller test 

1037 # cases for an initial period, by restriscting ourselves to test cases 

1038 # that are not much larger than it. 

1039 # 

1040 # Calculating the actual minimal generated test case is hard, so we 

1041 # take a best guess that zero extending a prefix produces the minimal 

1042 # test case starting with that prefix (this is true for our built in 

1043 # strategies). This is only a reasonable thing to do if the resulting 

1044 # test case is valid. If we regularly run into situations where it is 

1045 # not valid then this strategy is a waste of time, so we want to 

1046 # abandon it early. In order to do this we track how many times in a 

1047 # row it has failed to work, and abort small test case generation when 

1048 # it has failed too many times in a row. 

1049 consecutive_zero_extend_is_invalid = 0 

1050 

1051 # We control growth during initial example generation, for two 

1052 # reasons: 

1053 # 

1054 # * It gives us an opportunity to find small examples early, which 

1055 # gives us a fast path for easy to find bugs. 

1056 # * It avoids low probability events where we might end up 

1057 # generating very large examples during health checks, which 

1058 # on slower machines can trigger HealthCheck.too_slow. 

1059 # 

1060 # The heuristic we use is that we attempt to estimate the smallest 

1061 # extension of this prefix, and limit the size to no more than 

1062 # an order of magnitude larger than that. If we fail to estimate 

1063 # the size accurately, we skip over this prefix and try again. 

1064 # 

1065 # We need to tune the example size based on the initial prefix, 

1066 # because any fixed size might be too small, and any size based 

1067 # on the strategy in general can fall afoul of strategies that 

1068 # have very different sizes for different prefixes. 

1069 # 

1070 # We previously set a minimum value of 10 on small_example_cap, with the 

1071 # reasoning of avoiding flaky health checks. However, some users set a 

1072 # low max_examples for performance. A hard lower bound in this case biases 

1073 # the distribution towards small (and less powerful) examples. Flaky 

1074 # and loud health checks are better than silent performance degradation. 

1075 small_example_cap = min(self.settings.max_examples // 10, 50) 

1076 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1, 10) 

1077 ran_optimisations = False 

1078 self._switch_to_hypothesis_provider = False 

1079 

1080 while self.should_generate_more(): 

1081 # we don't yet integrate DataTree with backends. Instead of generating 

1082 # a novel prefix, ask the backend for an input. 

1083 if not self.using_hypothesis_backend: 

1084 data = self.new_conjecture_data([]) 

1085 with suppress(BackendCannotProceed): 

1086 self.test_function(data) 

1087 continue 

1088 

1089 self._current_phase = "generate" 

1090 prefix = self.generate_novel_prefix() 

1091 if ( 

1092 self.valid_examples <= small_example_cap 

1093 and self.call_count <= 5 * small_example_cap 

1094 and not self.interesting_examples 

1095 and consecutive_zero_extend_is_invalid < 5 

1096 ): 

1097 minimal_example = self.cached_test_function( 

1098 prefix + (ChoiceTemplate("simplest", count=None),) 

1099 ) 

1100 

1101 if minimal_example.status < Status.VALID: 

1102 consecutive_zero_extend_is_invalid += 1 

1103 continue 

1104 # Because the Status code is greater than Status.VALID, it cannot be 

1105 # Status.OVERRUN, which guarantees that the minimal_example is a 

1106 # ConjectureResult object. 

1107 assert isinstance(minimal_example, ConjectureResult) 

1108 consecutive_zero_extend_is_invalid = 0 

1109 minimal_extension = len(minimal_example.choices) - len(prefix) 

1110 max_length = len(prefix) + minimal_extension * 5 

1111 

1112 # We could end up in a situation where even though the prefix was 

1113 # novel when we generated it, because we've now tried zero extending 

1114 # it not all possible continuations of it will be novel. In order to 

1115 # avoid making redundant test calls, we rerun it in simulation mode 

1116 # first. If this has a predictable result, then we don't bother 

1117 # running the test function for real here. If however we encounter 

1118 # some novel behaviour, we try again with the real test function, 

1119 # starting from the new novel prefix that has discovered. 

1120 trial_data = self.new_conjecture_data(prefix, max_choices=max_length) 

1121 try: 

1122 self.tree.simulate_test_function(trial_data) 

1123 continue 

1124 except PreviouslyUnseenBehaviour: 

1125 pass 

1126 

1127 # If the simulation entered part of the tree that has been killed, 

1128 # we don't want to run this. 

1129 assert isinstance(trial_data.observer, TreeRecordingObserver) 

1130 if trial_data.observer.killed: 

1131 continue 

1132 

1133 # We might have hit the cap on number of examples we should 

1134 # run when calculating the minimal example. 

1135 if not self.should_generate_more(): 

1136 break 

1137 

1138 prefix = trial_data.choices 

1139 else: 

1140 max_length = None 

1141 

1142 data = self.new_conjecture_data(prefix, max_choices=max_length) 

1143 self.test_function(data) 

1144 

1145 if ( 

1146 data.status is Status.OVERRUN 

1147 and max_length is not None 

1148 and "invalid because" not in data.events 

1149 ): 

1150 data.events["invalid because"] = ( 

1151 "reduced max size for early examples (avoids flaky health checks)" 

1152 ) 

1153 

1154 self.generate_mutations_from(data) 

1155 

1156 # Although the optimisations are logically a distinct phase, we 

1157 # actually normally run them as part of example generation. The 

1158 # reason for this is that we cannot guarantee that optimisation 

1159 # actually exhausts our budget: It might finish running and we 

1160 # discover that actually we still could run a bunch more test cases 

1161 # if we want. 

1162 if ( 

1163 self.valid_examples >= max(small_example_cap, optimise_at) 

1164 and not ran_optimisations 

1165 ): 

1166 ran_optimisations = True 

1167 self._current_phase = "target" 

1168 self.optimise_targets() 

1169 

1170 def generate_mutations_from( 

1171 self, data: Union[ConjectureData, ConjectureResult] 

1172 ) -> None: 

1173 # A thing that is often useful but rarely happens by accident is 

1174 # to generate the same value at multiple different points in the 

1175 # test case. 

1176 # 

1177 # Rather than make this the responsibility of individual strategies 

1178 # we implement a small mutator that just takes parts of the test 

1179 # case with the same label and tries replacing one of them with a 

1180 # copy of the other and tries running it. If we've made a good 

1181 # guess about what to put where, this will run a similar generated 

1182 # test case with more duplication. 

1183 if ( 

1184 # An OVERRUN doesn't have enough information about the test 

1185 # case to mutate, so we just skip those. 

1186 data.status >= Status.INVALID 

1187 # This has a tendency to trigger some weird edge cases during 

1188 # generation so we don't let it run until we're done with the 

1189 # health checks. 

1190 and self.health_check_state is None 

1191 ): 

1192 initial_calls = self.call_count 

1193 failed_mutations = 0 

1194 

1195 while ( 

1196 self.should_generate_more() 

1197 # We implement fairly conservative checks for how long we 

1198 # we should run mutation for, as it's generally not obvious 

1199 # how helpful it is for any given test case. 

1200 and self.call_count <= initial_calls + 5 

1201 and failed_mutations <= 5 

1202 ): 

1203 groups = data.spans.mutator_groups 

1204 if not groups: 

1205 break 

1206 

1207 group = self.random.choice(groups) 

1208 (start1, end1), (start2, end2) = self.random.sample(sorted(group), 2) 

1209 if start1 > start2: 

1210 (start1, end1), (start2, end2) = (start2, end2), (start1, end1) 

1211 

1212 if ( 

1213 start1 <= start2 <= end2 <= end1 

1214 ): # pragma: no cover # flaky on conjecture-cover tests 

1215 # One span entirely contains the other. The strategy is very 

1216 # likely some kind of tree. e.g. we might have 

1217 # 

1218 # ┌─────┐ 

1219 # ┌─────┤ a ├──────┐ 

1220 # │ └─────┘ │ 

1221 # ┌──┴──┐ ┌──┴──┐ 

1222 # ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1223 # │ └──┬──┘ │ │ └──┬──┘ │ 

1224 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1225 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1226 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1227 # 

1228 # where each node is drawn from the same strategy and so 

1229 # has the same span label. We might have selected the spans 

1230 # corresponding to the a and c nodes, which is the entire 

1231 # tree and the subtree of (and including) c respectively. 

1232 # 

1233 # There are two possible mutations we could apply in this case: 

1234 # 1. replace a with c (replace child with parent) 

1235 # 2. replace c with a (replace parent with child) 

1236 # 

1237 # (1) results in multiple partial copies of the 

1238 # parent: 

1239 # ┌─────┐ 

1240 # ┌─────┤ a ├────────────┐ 

1241 # │ └─────┘ │ 

1242 # ┌──┴──┐ ┌─┴───┐ 

1243 # ┌──┤ b ├──┐ ┌─────┤ a ├──────┐ 

1244 # │ └──┬──┘ │ │ └─────┘ │ 

1245 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌──┴──┐ ┌──┴──┐ 

1246 # │ d │ │ e │ │ f │ ┌──┤ b ├──┐ ┌──┤ c ├──┐ 

1247 # └───┘ └───┘ └───┘ │ └──┬──┘ │ │ └──┬──┘ │ 

1248 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1249 # │ d │ │ e │ │ f │ │ g │ │ h │ │ i │ 

1250 # └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ 

1251 # 

1252 # While (2) results in truncating part of the parent: 

1253 # 

1254 # ┌─────┐ 

1255 # ┌──┤ c ├──┐ 

1256 # │ └──┬──┘ │ 

1257 # ┌─┴─┐ ┌─┴─┐ ┌─┴─┐ 

1258 # │ g │ │ h │ │ i │ 

1259 # └───┘ └───┘ └───┘ 

1260 # 

1261 # (1) is the same as Example IV.4. in Nautilus (NDSS '19) 

1262 # (https://wcventure.github.io/FuzzingPaper/Paper/NDSS19_Nautilus.pdf), 

1263 # except we do not repeat the replacement additional times 

1264 # (the paper repeats it once for a total of two copies). 

1265 # 

1266 # We currently only apply mutation (1), and ignore mutation 

1267 # (2). The reason is that the attempt generated from (2) is 

1268 # always something that Hypothesis could easily have generated 

1269 # itself, by simply not making various choices. Whereas 

1270 # duplicating the exact value + structure of particular choices 

1271 # in (1) would have been hard for Hypothesis to generate by 

1272 # chance. 

1273 # 

1274 # TODO: an extension of this mutation might repeat (1) on 

1275 # a geometric distribution between 0 and ~10 times. We would 

1276 # need to find the corresponding span to recurse on in the new 

1277 # choices, probably just by using the choices index. 

1278 

1279 # case (1): duplicate the choices in start1:start2. 

1280 attempt = data.choices[:start2] + data.choices[start1:] 

1281 else: 

1282 (start, end) = self.random.choice([(start1, end1), (start2, end2)]) 

1283 replacement = data.choices[start:end] 

1284 # We attempt to replace both the examples with 

1285 # whichever choice we made. Note that this might end 

1286 # up messing up and getting the example boundaries 

1287 # wrong - labels matching are only a best guess as to 

1288 # whether the two are equivalent - but it doesn't 

1289 # really matter. It may not achieve the desired result, 

1290 # but it's still a perfectly acceptable choice sequence 

1291 # to try. 

1292 attempt = ( 

1293 data.choices[:start1] 

1294 + replacement 

1295 + data.choices[end1:start2] 

1296 + replacement 

1297 + data.choices[end2:] 

1298 ) 

1299 

1300 try: 

1301 new_data = self.cached_test_function( 

1302 attempt, 

1303 # We set error_on_discard so that we don't end up 

1304 # entering parts of the tree we consider redundant 

1305 # and not worth exploring. 

1306 error_on_discard=True, 

1307 ) 

1308 except ContainsDiscard: 

1309 failed_mutations += 1 

1310 continue 

1311 

1312 if new_data is Overrun: 

1313 failed_mutations += 1 # pragma: no cover # annoying case 

1314 else: 

1315 assert isinstance(new_data, ConjectureResult) 

1316 if ( 

1317 new_data.status >= data.status 

1318 and choices_key(data.choices) != choices_key(new_data.choices) 

1319 and all( 

1320 k in new_data.target_observations 

1321 and new_data.target_observations[k] >= v 

1322 for k, v in data.target_observations.items() 

1323 ) 

1324 ): 

1325 data = new_data 

1326 failed_mutations = 0 

1327 else: 

1328 failed_mutations += 1 

1329 

1330 def optimise_targets(self) -> None: 

1331 """If any target observations have been made, attempt to optimise them 

1332 all.""" 

1333 if not self.should_optimise: 

1334 return 

1335 from hypothesis.internal.conjecture.optimiser import Optimiser 

1336 

1337 # We want to avoid running the optimiser for too long in case we hit 

1338 # an unbounded target score. We start this off fairly conservatively 

1339 # in case interesting examples are easy to find and then ramp it up 

1340 # on an exponential schedule so we don't hamper the optimiser too much 

1341 # if it needs a long time to find good enough improvements. 

1342 max_improvements = 10 

1343 while True: 

1344 prev_calls = self.call_count 

1345 

1346 any_improvements = False 

1347 

1348 for target, data in list(self.best_examples_of_observed_targets.items()): 

1349 optimiser = Optimiser( 

1350 self, data, target, max_improvements=max_improvements 

1351 ) 

1352 optimiser.run() 

1353 if optimiser.improvements > 0: 

1354 any_improvements = True 

1355 

1356 if self.interesting_examples: 

1357 break 

1358 

1359 max_improvements *= 2 

1360 

1361 if any_improvements: 

1362 continue 

1363 

1364 if self.best_observed_targets: 

1365 self.pareto_optimise() 

1366 

1367 if prev_calls == self.call_count: 

1368 break 

1369 

1370 def pareto_optimise(self) -> None: 

1371 if self.pareto_front is not None: 

1372 ParetoOptimiser(self).run() 

1373 

1374 def _run(self) -> None: 

1375 # have to use the primitive provider to interpret database bits... 

1376 self._switch_to_hypothesis_provider = True 

1377 with self._log_phase_statistics("reuse"): 

1378 self.reuse_existing_examples() 

1379 # Fast path for development: If the database gave us interesting 

1380 # examples from the previously stored primary key, don't try 

1381 # shrinking it again as it's unlikely to work. 

1382 if self.reused_previously_shrunk_test_case: 

1383 self.exit_with(ExitReason.finished) 

1384 # ...but we should use the supplied provider when generating... 

1385 self._switch_to_hypothesis_provider = False 

1386 with self._log_phase_statistics("generate"): 

1387 self.generate_new_examples() 

1388 # We normally run the targeting phase mixed in with the generate phase, 

1389 # but if we've been asked to run it but not generation then we have to 

1390 # run it explicitly on its own here. 

1391 if Phase.generate not in self.settings.phases: 

1392 self._current_phase = "target" 

1393 self.optimise_targets() 

1394 # ...and back to the primitive provider when shrinking. 

1395 self._switch_to_hypothesis_provider = True 

1396 with self._log_phase_statistics("shrink"): 

1397 self.shrink_interesting_examples() 

1398 self.exit_with(ExitReason.finished) 

1399 

1400 def new_conjecture_data( 

1401 self, 

1402 prefix: Sequence[Union[ChoiceT, ChoiceTemplate]], 

1403 *, 

1404 observer: Optional[DataObserver] = None, 

1405 max_choices: Optional[int] = None, 

1406 ) -> ConjectureData: 

1407 provider = ( 

1408 HypothesisProvider if self._switch_to_hypothesis_provider else self.provider 

1409 ) 

1410 observer = observer or self.tree.new_observer() 

1411 if not self.using_hypothesis_backend: 

1412 observer = DataObserver() 

1413 

1414 return ConjectureData( 

1415 prefix=prefix, 

1416 observer=observer, 

1417 provider=provider, 

1418 max_choices=max_choices, 

1419 random=self.random, 

1420 ) 

1421 

1422 def shrink_interesting_examples(self) -> None: 

1423 """If we've found interesting examples, try to replace each of them 

1424 with a minimal interesting example with the same interesting_origin. 

1425 

1426 We may find one or more examples with a new interesting_origin 

1427 during the shrink process. If so we shrink these too. 

1428 """ 

1429 if Phase.shrink not in self.settings.phases or not self.interesting_examples: 

1430 return 

1431 

1432 self.debug("Shrinking interesting examples") 

1433 self.finish_shrinking_deadline = time.perf_counter() + MAX_SHRINKING_SECONDS 

1434 

1435 for prev_data in sorted( 

1436 self.interesting_examples.values(), key=lambda d: sort_key(d.nodes) 

1437 ): 

1438 assert prev_data.status == Status.INTERESTING 

1439 data = self.new_conjecture_data(prev_data.choices) 

1440 self.test_function(data) 

1441 if data.status != Status.INTERESTING: 

1442 self.exit_with(ExitReason.flaky) 

1443 

1444 self.clear_secondary_key() 

1445 

1446 while len(self.shrunk_examples) < len(self.interesting_examples): 

1447 target, example = min( 

1448 ( 

1449 (k, v) 

1450 for k, v in self.interesting_examples.items() 

1451 if k not in self.shrunk_examples 

1452 ), 

1453 key=lambda kv: (sort_key(kv[1].nodes), shortlex(repr(kv[0]))), 

1454 ) 

1455 self.debug(f"Shrinking {target!r}: {example.choices}") 

1456 

1457 if not self.settings.report_multiple_bugs: 

1458 # If multi-bug reporting is disabled, we shrink our currently-minimal 

1459 # failure, allowing 'slips' to any bug with a smaller minimal example. 

1460 self.shrink(example, lambda d: d.status == Status.INTERESTING) 

1461 return 

1462 

1463 def predicate(d: Union[ConjectureResult, _Overrun]) -> bool: 

1464 if d.status < Status.INTERESTING: 

1465 return False 

1466 d = cast(ConjectureResult, d) 

1467 return d.interesting_origin == target 

1468 

1469 self.shrink(example, predicate) 

1470 

1471 self.shrunk_examples.add(target) 

1472 

1473 def clear_secondary_key(self) -> None: 

1474 if self.has_existing_examples(): 

1475 # If we have any smaller examples in the secondary corpus, now is 

1476 # a good time to try them to see if they work as shrinks. They 

1477 # probably won't, but it's worth a shot and gives us a good 

1478 # opportunity to clear out the database. 

1479 

1480 # It's not worth trying the primary corpus because we already 

1481 # tried all of those in the initial phase. 

1482 corpus = sorted( 

1483 self.settings.database.fetch(self.secondary_key), key=shortlex 

1484 ) 

1485 for c in corpus: 

1486 choices = choices_from_bytes(c) 

1487 if choices is None: 

1488 self.settings.database.delete(self.secondary_key, c) 

1489 continue 

1490 primary = { 

1491 choices_to_bytes(v.choices) 

1492 for v in self.interesting_examples.values() 

1493 } 

1494 if shortlex(c) > max(map(shortlex, primary)): 

1495 break 

1496 

1497 self.cached_test_function(choices) 

1498 # We unconditionally remove c from the secondary key as it 

1499 # is either now primary or worse than our primary example 

1500 # of this reason for interestingness. 

1501 self.settings.database.delete(self.secondary_key, c) 

1502 

1503 def shrink( 

1504 self, 

1505 example: Union[ConjectureData, ConjectureResult], 

1506 predicate: Optional[ShrinkPredicateT] = None, 

1507 allow_transition: Optional[ 

1508 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1509 ] = None, 

1510 ) -> Union[ConjectureData, ConjectureResult]: 

1511 s = self.new_shrinker(example, predicate, allow_transition) 

1512 s.shrink() 

1513 return s.shrink_target 

1514 

1515 def new_shrinker( 

1516 self, 

1517 example: Union[ConjectureData, ConjectureResult], 

1518 predicate: Optional[ShrinkPredicateT] = None, 

1519 allow_transition: Optional[ 

1520 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

1521 ] = None, 

1522 ) -> Shrinker: 

1523 return Shrinker( 

1524 self, 

1525 example, 

1526 predicate, 

1527 allow_transition=allow_transition, 

1528 explain=Phase.explain in self.settings.phases, 

1529 in_target_phase=self._current_phase == "target", 

1530 ) 

1531 

1532 def passing_choice_sequences( 

1533 self, prefix: Sequence[ChoiceNode] = () 

1534 ) -> frozenset[tuple[ChoiceNode, ...]]: 

1535 """Return a collection of choice sequence nodes which cause the test to pass. 

1536 Optionally restrict this by a certain prefix, which is useful for explain mode. 

1537 """ 

1538 return frozenset( 

1539 cast(ConjectureResult, result).nodes 

1540 for key in self.__data_cache 

1541 if (result := self.__data_cache[key]).status is Status.VALID 

1542 and startswith(cast(ConjectureResult, result).nodes, prefix) 

1543 ) 

1544 

1545 

1546class ContainsDiscard(Exception): 

1547 pass