Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/hypothesis/core.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

764 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""This module provides the core primitives of Hypothesis, such as given.""" 

12import base64 

13import contextlib 

14import dataclasses 

15import datetime 

16import inspect 

17import io 

18import math 

19import sys 

20import time 

21import traceback 

22import types 

23import unittest 

24import warnings 

25import zlib 

26from collections import defaultdict 

27from collections.abc import Coroutine, Generator, Hashable, Iterable, Sequence 

28from dataclasses import dataclass, field 

29from functools import partial 

30from inspect import Parameter 

31from random import Random 

32from typing import ( 

33 TYPE_CHECKING, 

34 Any, 

35 BinaryIO, 

36 Callable, 

37 Optional, 

38 TypeVar, 

39 Union, 

40 overload, 

41) 

42from unittest import TestCase 

43 

44from hypothesis import strategies as st 

45from hypothesis._settings import ( 

46 HealthCheck, 

47 Phase, 

48 Verbosity, 

49 all_settings, 

50 local_settings, 

51 settings as Settings, 

52) 

53from hypothesis.control import BuildContext, currently_in_test_context 

54from hypothesis.database import choices_from_bytes, choices_to_bytes 

55from hypothesis.errors import ( 

56 BackendCannotProceed, 

57 DeadlineExceeded, 

58 DidNotReproduce, 

59 FailedHealthCheck, 

60 FlakyFailure, 

61 FlakyReplay, 

62 Found, 

63 Frozen, 

64 HypothesisException, 

65 HypothesisWarning, 

66 InvalidArgument, 

67 NoSuchExample, 

68 StopTest, 

69 Unsatisfiable, 

70 UnsatisfiedAssumption, 

71) 

72from hypothesis.internal.compat import ( 

73 PYPY, 

74 BaseExceptionGroup, 

75 add_note, 

76 bad_django_TestCase, 

77 get_type_hints, 

78 int_from_bytes, 

79) 

80from hypothesis.internal.conjecture.choice import ChoiceT 

81from hypothesis.internal.conjecture.data import ConjectureData, Status 

82from hypothesis.internal.conjecture.engine import BUFFER_SIZE, ConjectureRunner 

83from hypothesis.internal.conjecture.junkdrawer import ( 

84 ensure_free_stackframes, 

85 gc_cumulative_time, 

86) 

87from hypothesis.internal.conjecture.providers import ( 

88 BytestringProvider, 

89 PrimitiveProvider, 

90) 

91from hypothesis.internal.conjecture.shrinker import sort_key 

92from hypothesis.internal.entropy import deterministic_PRNG 

93from hypothesis.internal.escalation import ( 

94 InterestingOrigin, 

95 current_pytest_item, 

96 format_exception, 

97 get_trimmed_traceback, 

98 is_hypothesis_file, 

99) 

100from hypothesis.internal.healthcheck import fail_health_check 

101from hypothesis.internal.observability import ( 

102 OBSERVABILITY_COLLECT_COVERAGE, 

103 TESTCASE_CALLBACKS, 

104 _system_metadata, 

105 deliver_json_blob, 

106 make_testcase, 

107) 

108from hypothesis.internal.reflection import ( 

109 convert_positional_arguments, 

110 define_function_signature, 

111 function_digest, 

112 get_pretty_function_description, 

113 get_signature, 

114 impersonate, 

115 is_mock, 

116 nicerepr, 

117 proxies, 

118 repr_call, 

119) 

120from hypothesis.internal.scrutineer import ( 

121 MONITORING_TOOL_ID, 

122 Trace, 

123 Tracer, 

124 explanatory_lines, 

125 tractable_coverage_report, 

126) 

127from hypothesis.internal.validation import check_type 

128from hypothesis.reporting import ( 

129 current_verbosity, 

130 report, 

131 verbose_report, 

132 with_reporter, 

133) 

134from hypothesis.statistics import describe_statistics, describe_targets, note_statistics 

135from hypothesis.strategies._internal.misc import NOTHING 

136from hypothesis.strategies._internal.strategies import ( 

137 Ex, 

138 SearchStrategy, 

139 check_strategy, 

140) 

141from hypothesis.strategies._internal.utils import to_jsonable 

142from hypothesis.vendor.pretty import RepresentationPrinter 

143from hypothesis.version import __version__ 

144 

145if sys.version_info >= (3, 10): 

146 from types import EllipsisType 

147elif TYPE_CHECKING: 

148 from builtins import ellipsis as EllipsisType 

149else: # pragma: no cover 

150 EllipsisType = type(Ellipsis) 

151 

152 

153TestFunc = TypeVar("TestFunc", bound=Callable) 

154 

155 

156running_under_pytest = False 

157pytest_shows_exceptiongroups = True 

158global_force_seed = None 

159_hypothesis_global_random = None 

160 

161 

162@dataclass 

163class Example: 

164 args: Any 

165 kwargs: Any 

166 # Plus two optional arguments for .xfail() 

167 raises: Any = field(default=None) 

168 reason: Any = field(default=None) 

169 

170 

171class example: 

172 """A decorator which ensures a specific example is always tested.""" 

173 

174 def __init__(self, *args: Any, **kwargs: Any) -> None: 

175 if args and kwargs: 

176 raise InvalidArgument( 

177 "Cannot mix positional and keyword arguments for examples" 

178 ) 

179 if not (args or kwargs): 

180 raise InvalidArgument("An example must provide at least one argument") 

181 

182 self.hypothesis_explicit_examples: list[Example] = [] 

183 self._this_example = Example(tuple(args), kwargs) 

184 

185 def __call__(self, test: TestFunc) -> TestFunc: 

186 if not hasattr(test, "hypothesis_explicit_examples"): 

187 test.hypothesis_explicit_examples = self.hypothesis_explicit_examples # type: ignore 

188 test.hypothesis_explicit_examples.append(self._this_example) # type: ignore 

189 return test 

190 

191 def xfail( 

192 self, 

193 condition: bool = True, # noqa: FBT002 

194 *, 

195 reason: str = "", 

196 raises: Union[ 

197 type[BaseException], tuple[type[BaseException], ...] 

198 ] = BaseException, 

199 ) -> "example": 

200 """Mark this example as an expected failure, similarly to 

201 :obj:`pytest.mark.xfail(strict=True) <pytest.mark.xfail>`. 

202 

203 Expected-failing examples allow you to check that your test does fail on 

204 some examples, and therefore build confidence that *passing* tests are 

205 because your code is working, not because the test is missing something. 

206 

207 .. code-block:: python 

208 

209 @example(...).xfail() 

210 @example(...).xfail(reason="Prices must be non-negative") 

211 @example(...).xfail(raises=(KeyError, ValueError)) 

212 @example(...).xfail(sys.version_info[:2] >= (3, 12), reason="needs py 3.12") 

213 @example(...).xfail(condition=sys.platform != "linux", raises=OSError) 

214 def test(x): 

215 pass 

216 

217 .. note:: 

218 

219 Expected-failing examples are handled separately from those generated 

220 by strategies, so you should usually ensure that there is no overlap. 

221 

222 .. code-block:: python 

223 

224 @example(x=1, y=0).xfail(raises=ZeroDivisionError) 

225 @given(x=st.just(1), y=st.integers()) # Missing `.filter(bool)`! 

226 def test_fraction(x, y): 

227 # This test will try the explicit example and see it fail as 

228 # expected, then go on to generate more examples from the 

229 # strategy. If we happen to generate y=0, the test will fail 

230 # because only the explicit example is treated as xfailing. 

231 x / y 

232 """ 

233 check_type(bool, condition, "condition") 

234 check_type(str, reason, "reason") 

235 if not ( 

236 isinstance(raises, type) and issubclass(raises, BaseException) 

237 ) and not ( 

238 isinstance(raises, tuple) 

239 and raises # () -> expected to fail with no error, which is impossible 

240 and all( 

241 isinstance(r, type) and issubclass(r, BaseException) for r in raises 

242 ) 

243 ): 

244 raise InvalidArgument( 

245 f"{raises=} must be an exception type or tuple of exception types" 

246 ) 

247 if condition: 

248 self._this_example = dataclasses.replace( 

249 self._this_example, raises=raises, reason=reason 

250 ) 

251 return self 

252 

253 def via(self, whence: str, /) -> "example": 

254 """Attach a machine-readable label noting whence this example came. 

255 

256 The idea is that tools will be able to add ``@example()`` cases for you, e.g. 

257 to maintain a high-coverage set of explicit examples, but also *remove* them 

258 if they become redundant - without ever deleting manually-added examples: 

259 

260 .. code-block:: python 

261 

262 # You can choose to annotate examples, or not, as you prefer 

263 @example(...) 

264 @example(...).via("regression test for issue #42") 

265 

266 # The `hy-` prefix is reserved for automated tooling 

267 @example(...).via("hy-failing") 

268 @example(...).via("hy-coverage") 

269 @example(...).via("hy-target-$label") 

270 def test(x): 

271 pass 

272 """ 

273 if not isinstance(whence, str): 

274 raise InvalidArgument(".via() must be passed a string") 

275 # This is deliberately a no-op at runtime; the tools operate on source code. 

276 return self 

277 

278 

279def seed(seed: Hashable) -> Callable[[TestFunc], TestFunc]: 

280 """Start the test execution from a specific seed. 

281 

282 May be any hashable object. No exact meaning for seed is provided 

283 other than that for a fixed seed value Hypothesis will try the same 

284 actions (insofar as it can given external sources of non- 

285 determinism. e.g. timing and hash randomization). 

286 

287 Overrides the derandomize setting, which is designed to enable 

288 deterministic builds rather than reproducing observed failures. 

289 

290 """ 

291 

292 def accept(test): 

293 test._hypothesis_internal_use_seed = seed 

294 current_settings = getattr(test, "_hypothesis_internal_use_settings", None) 

295 test._hypothesis_internal_use_settings = Settings( 

296 current_settings, database=None 

297 ) 

298 return test 

299 

300 return accept 

301 

302 

303def reproduce_failure(version: str, blob: bytes) -> Callable[[TestFunc], TestFunc]: 

304 """Run the example that corresponds to this data blob in order to reproduce 

305 a failure. 

306 

307 A test with this decorator *always* runs only one example and always fails. 

308 If the provided example does not cause a failure, or is in some way invalid 

309 for this test, then this will fail with a DidNotReproduce error. 

310 

311 This decorator is not intended to be a permanent addition to your test 

312 suite. It's simply some code you can add to ease reproduction of a problem 

313 in the event that you don't have access to the test database. Because of 

314 this, *no* compatibility guarantees are made between different versions of 

315 Hypothesis - its API may change arbitrarily from version to version. 

316 """ 

317 

318 def accept(test): 

319 test._hypothesis_internal_use_reproduce_failure = (version, blob) 

320 return test 

321 

322 return accept 

323 

324 

325def reproduction_decorator(choices: Iterable[ChoiceT]) -> str: 

326 return f"@reproduce_failure({__version__!r}, {encode_failure(choices)!r})" 

327 

328 

329def encode_failure(choices: Iterable[ChoiceT]) -> bytes: 

330 blob = choices_to_bytes(choices) 

331 compressed = zlib.compress(blob) 

332 if len(compressed) < len(blob): 

333 blob = b"\1" + compressed 

334 else: 

335 blob = b"\0" + blob 

336 return base64.b64encode(blob) 

337 

338 

339def decode_failure(blob: bytes) -> Sequence[ChoiceT]: 

340 try: 

341 decoded = base64.b64decode(blob) 

342 except Exception: 

343 raise InvalidArgument(f"Invalid base64 encoded string: {blob!r}") from None 

344 

345 prefix = decoded[:1] 

346 if prefix == b"\0": 

347 decoded = decoded[1:] 

348 elif prefix == b"\1": 

349 try: 

350 decoded = zlib.decompress(decoded[1:]) 

351 except zlib.error as err: 

352 raise InvalidArgument( 

353 f"Invalid zlib compression for blob {blob!r}" 

354 ) from err 

355 else: 

356 raise InvalidArgument( 

357 f"Could not decode blob {blob!r}: Invalid start byte {prefix!r}" 

358 ) 

359 

360 choices = choices_from_bytes(decoded) 

361 if choices is None: 

362 raise InvalidArgument(f"Invalid serialized choice sequence for blob {blob!r}") 

363 

364 return choices 

365 

366 

367def _invalid(message, *, exc=InvalidArgument, test, given_kwargs): 

368 @impersonate(test) 

369 def wrapped_test(*arguments, **kwargs): # pragma: no cover # coverage limitation 

370 raise exc(message) 

371 

372 wrapped_test.is_hypothesis_test = True 

373 wrapped_test.hypothesis = HypothesisHandle( 

374 inner_test=test, 

375 _get_fuzz_target=wrapped_test, 

376 _given_kwargs=given_kwargs, 

377 ) 

378 return wrapped_test 

379 

380 

381def is_invalid_test(test, original_sig, given_arguments, given_kwargs): 

382 """Check the arguments to ``@given`` for basic usage constraints. 

383 

384 Most errors are not raised immediately; instead we return a dummy test 

385 function that will raise the appropriate error if it is actually called. 

386 When the user runs a subset of tests (e.g via ``pytest -k``), errors will 

387 only be reported for tests that actually ran. 

388 """ 

389 invalid = partial(_invalid, test=test, given_kwargs=given_kwargs) 

390 

391 if not (given_arguments or given_kwargs): 

392 return invalid("given must be called with at least one argument") 

393 

394 params = list(original_sig.parameters.values()) 

395 pos_params = [p for p in params if p.kind is p.POSITIONAL_OR_KEYWORD] 

396 kwonly_params = [p for p in params if p.kind is p.KEYWORD_ONLY] 

397 if given_arguments and params != pos_params: 

398 return invalid( 

399 "positional arguments to @given are not supported with varargs, " 

400 "varkeywords, positional-only, or keyword-only arguments" 

401 ) 

402 

403 if len(given_arguments) > len(pos_params): 

404 return invalid( 

405 f"Too many positional arguments for {test.__name__}() were passed to " 

406 f"@given - expected at most {len(pos_params)} " 

407 f"arguments, but got {len(given_arguments)} {given_arguments!r}" 

408 ) 

409 

410 if ... in given_arguments: 

411 return invalid( 

412 "... was passed as a positional argument to @given, but may only be " 

413 "passed as a keyword argument or as the sole argument of @given" 

414 ) 

415 

416 if given_arguments and given_kwargs: 

417 return invalid("cannot mix positional and keyword arguments to @given") 

418 extra_kwargs = [ 

419 k for k in given_kwargs if k not in {p.name for p in pos_params + kwonly_params} 

420 ] 

421 if extra_kwargs and (params == [] or params[-1].kind is not params[-1].VAR_KEYWORD): 

422 arg = extra_kwargs[0] 

423 extra = "" 

424 if arg in all_settings: 

425 extra = f". Did you mean @settings({arg}={given_kwargs[arg]!r})?" 

426 return invalid( 

427 f"{test.__name__}() got an unexpected keyword argument {arg!r}, " 

428 f"from `{arg}={given_kwargs[arg]!r}` in @given{extra}" 

429 ) 

430 if any(p.default is not p.empty for p in params): 

431 return invalid("Cannot apply @given to a function with defaults.") 

432 

433 # This case would raise Unsatisfiable *anyway*, but by detecting it here we can 

434 # provide a much more helpful error message for people e.g. using the Ghostwriter. 

435 empty = [ 

436 f"{s!r} (arg {idx})" for idx, s in enumerate(given_arguments) if s is NOTHING 

437 ] + [f"{name}={s!r}" for name, s in given_kwargs.items() if s is NOTHING] 

438 if empty: 

439 strats = "strategies" if len(empty) > 1 else "strategy" 

440 return invalid( 

441 f"Cannot generate examples from empty {strats}: " + ", ".join(empty), 

442 exc=Unsatisfiable, 

443 ) 

444 

445 

446def execute_explicit_examples(state, wrapped_test, arguments, kwargs, original_sig): 

447 assert isinstance(state, StateForActualGivenExecution) 

448 posargs = [ 

449 p.name 

450 for p in original_sig.parameters.values() 

451 if p.kind is p.POSITIONAL_OR_KEYWORD 

452 ] 

453 

454 for example in reversed(getattr(wrapped_test, "hypothesis_explicit_examples", ())): 

455 assert isinstance(example, Example) 

456 # All of this validation is to check that @example() got "the same" arguments 

457 # as @given, i.e. corresponding to the same parameters, even though they might 

458 # be any mixture of positional and keyword arguments. 

459 if example.args: 

460 assert not example.kwargs 

461 if any( 

462 p.kind is p.POSITIONAL_ONLY for p in original_sig.parameters.values() 

463 ): 

464 raise InvalidArgument( 

465 "Cannot pass positional arguments to @example() when decorating " 

466 "a test function which has positional-only parameters." 

467 ) 

468 if len(example.args) > len(posargs): 

469 raise InvalidArgument( 

470 "example has too many arguments for test. Expected at most " 

471 f"{len(posargs)} but got {len(example.args)}" 

472 ) 

473 example_kwargs = dict(zip(posargs[-len(example.args) :], example.args)) 

474 else: 

475 example_kwargs = dict(example.kwargs) 

476 given_kws = ", ".join( 

477 repr(k) for k in sorted(wrapped_test.hypothesis._given_kwargs) 

478 ) 

479 example_kws = ", ".join(repr(k) for k in sorted(example_kwargs)) 

480 if given_kws != example_kws: 

481 raise InvalidArgument( 

482 f"Inconsistent args: @given() got strategies for {given_kws}, " 

483 f"but @example() got arguments for {example_kws}" 

484 ) from None 

485 

486 # This is certainly true because the example_kwargs exactly match the params 

487 # reserved by @given(), which are then remove from the function signature. 

488 assert set(example_kwargs).isdisjoint(kwargs) 

489 example_kwargs.update(kwargs) 

490 

491 if Phase.explicit not in state.settings.phases: 

492 continue 

493 

494 with local_settings(state.settings): 

495 fragments_reported = [] 

496 empty_data = ConjectureData.for_choices([]) 

497 try: 

498 execute_example = partial( 

499 state.execute_once, 

500 empty_data, 

501 is_final=True, 

502 print_example=True, 

503 example_kwargs=example_kwargs, 

504 ) 

505 with with_reporter(fragments_reported.append): 

506 if example.raises is None: 

507 execute_example() 

508 else: 

509 # @example(...).xfail(...) 

510 bits = ", ".join(nicerepr(x) for x in arguments) + ", ".join( 

511 f"{k}={nicerepr(v)}" for k, v in example_kwargs.items() 

512 ) 

513 try: 

514 execute_example() 

515 except failure_exceptions_to_catch() as err: 

516 if not isinstance(err, example.raises): 

517 raise 

518 # Save a string form of this example; we'll warn if it's 

519 # ever generated by the strategy (which can't be xfailed) 

520 state.xfail_example_reprs.add( 

521 repr_call(state.test, arguments, example_kwargs) 

522 ) 

523 except example.raises as err: 

524 # We'd usually check this as early as possible, but it's 

525 # possible for failure_exceptions_to_catch() to grow when 

526 # e.g. pytest is imported between import- and test-time. 

527 raise InvalidArgument( 

528 f"@example({bits}) raised an expected {err!r}, " 

529 "but Hypothesis does not treat this as a test failure" 

530 ) from err 

531 else: 

532 # Unexpectedly passing; always raise an error in this case. 

533 reason = f" because {example.reason}" * bool(example.reason) 

534 if example.raises is BaseException: 

535 name = "exception" # special-case no raises= arg 

536 elif not isinstance(example.raises, tuple): 

537 name = example.raises.__name__ 

538 elif len(example.raises) == 1: 

539 name = example.raises[0].__name__ 

540 else: 

541 name = ( 

542 ", ".join(ex.__name__ for ex in example.raises[:-1]) 

543 + f", or {example.raises[-1].__name__}" 

544 ) 

545 vowel = name.upper()[0] in "AEIOU" 

546 raise AssertionError( 

547 f"Expected a{'n' * vowel} {name} from @example({bits})" 

548 f"{reason}, but no exception was raised." 

549 ) 

550 except UnsatisfiedAssumption: 

551 # Odd though it seems, we deliberately support explicit examples that 

552 # are then rejected by a call to `assume()`. As well as iterative 

553 # development, this is rather useful to replay Hypothesis' part of 

554 # a saved failure when other arguments are supplied by e.g. pytest. 

555 # See https://github.com/HypothesisWorks/hypothesis/issues/2125 

556 with contextlib.suppress(StopTest): 

557 empty_data.conclude_test(Status.INVALID) 

558 except BaseException as err: 

559 # In order to support reporting of multiple failing examples, we yield 

560 # each of the (report text, error) pairs we find back to the top-level 

561 # runner. This also ensures that user-facing stack traces have as few 

562 # frames of Hypothesis internals as possible. 

563 err = err.with_traceback(get_trimmed_traceback()) 

564 

565 # One user error - whether misunderstanding or typo - we've seen a few 

566 # times is to pass strategies to @example() where values are expected. 

567 # Checking is easy, and false-positives not much of a problem, so: 

568 if isinstance(err, failure_exceptions_to_catch()) and any( 

569 isinstance(arg, SearchStrategy) 

570 for arg in example.args + tuple(example.kwargs.values()) 

571 ): 

572 new = HypothesisWarning( 

573 "The @example() decorator expects to be passed values, but " 

574 "you passed strategies instead. See https://hypothesis." 

575 "readthedocs.io/en/latest/reproducing.html for details." 

576 ) 

577 new.__cause__ = err 

578 err = new 

579 

580 with contextlib.suppress(StopTest): 

581 empty_data.conclude_test(Status.INVALID) 

582 yield (fragments_reported, err) 

583 if ( 

584 state.settings.report_multiple_bugs 

585 and pytest_shows_exceptiongroups 

586 and isinstance(err, failure_exceptions_to_catch()) 

587 and not isinstance(err, skip_exceptions_to_reraise()) 

588 ): 

589 continue 

590 break 

591 finally: 

592 if fragments_reported: 

593 assert fragments_reported[0].startswith("Falsifying example") 

594 fragments_reported[0] = fragments_reported[0].replace( 

595 "Falsifying example", "Falsifying explicit example", 1 

596 ) 

597 

598 tc = make_testcase( 

599 start_timestamp=state._start_timestamp, 

600 test_name_or_nodeid=state.test_identifier, 

601 data=empty_data, 

602 how_generated="explicit example", 

603 string_repr=state._string_repr, 

604 timing=state._timing_features, 

605 ) 

606 deliver_json_blob(tc) 

607 

608 if fragments_reported: 

609 verbose_report(fragments_reported[0].replace("Falsifying", "Trying", 1)) 

610 for f in fragments_reported[1:]: 

611 verbose_report(f) 

612 

613 

614def get_random_for_wrapped_test(test, wrapped_test): 

615 settings = wrapped_test._hypothesis_internal_use_settings 

616 wrapped_test._hypothesis_internal_use_generated_seed = None 

617 

618 if wrapped_test._hypothesis_internal_use_seed is not None: 

619 return Random(wrapped_test._hypothesis_internal_use_seed) 

620 elif settings.derandomize: 

621 return Random(int_from_bytes(function_digest(test))) 

622 elif global_force_seed is not None: 

623 return Random(global_force_seed) 

624 else: 

625 global _hypothesis_global_random 

626 if _hypothesis_global_random is None: # pragma: no cover 

627 _hypothesis_global_random = Random() 

628 seed = _hypothesis_global_random.getrandbits(128) 

629 wrapped_test._hypothesis_internal_use_generated_seed = seed 

630 return Random(seed) 

631 

632 

633@dataclass 

634class Stuff: 

635 selfy: Any 

636 args: tuple 

637 kwargs: dict 

638 given_kwargs: dict 

639 

640 

641def process_arguments_to_given( 

642 wrapped_test: Any, 

643 arguments: Sequence[object], 

644 kwargs: dict[str, object], 

645 given_kwargs: dict[str, SearchStrategy], 

646 params: dict[str, Parameter], 

647) -> tuple[Sequence[object], dict[str, object], Stuff]: 

648 selfy = None 

649 arguments, kwargs = convert_positional_arguments(wrapped_test, arguments, kwargs) 

650 

651 # If the test function is a method of some kind, the bound object 

652 # will be the first named argument if there are any, otherwise the 

653 # first vararg (if any). 

654 posargs = [p.name for p in params.values() if p.kind is p.POSITIONAL_OR_KEYWORD] 

655 if posargs: 

656 selfy = kwargs.get(posargs[0]) 

657 elif arguments: 

658 selfy = arguments[0] 

659 

660 # Ensure that we don't mistake mocks for self here. 

661 # This can cause the mock to be used as the test runner. 

662 if is_mock(selfy): 

663 selfy = None 

664 

665 arguments = tuple(arguments) 

666 

667 with ensure_free_stackframes(): 

668 for k, s in given_kwargs.items(): 

669 check_strategy(s, name=k) 

670 s.validate() 

671 

672 stuff = Stuff(selfy=selfy, args=arguments, kwargs=kwargs, given_kwargs=given_kwargs) 

673 

674 return arguments, kwargs, stuff 

675 

676 

677def skip_exceptions_to_reraise(): 

678 """Return a tuple of exceptions meaning 'skip this test', to re-raise. 

679 

680 This is intended to cover most common test runners; if you would 

681 like another to be added please open an issue or pull request adding 

682 it to this function and to tests/cover/test_lazy_import.py 

683 """ 

684 # This is a set because nose may simply re-export unittest.SkipTest 

685 exceptions = set() 

686 # We use this sys.modules trick to avoid importing libraries - 

687 # you can't be an instance of a type from an unimported module! 

688 # This is fast enough that we don't need to cache the result, 

689 # and more importantly it avoids possible side-effects :-) 

690 if "unittest" in sys.modules: 

691 exceptions.add(sys.modules["unittest"].SkipTest) 

692 if "unittest2" in sys.modules: 

693 exceptions.add(sys.modules["unittest2"].SkipTest) 

694 if "nose" in sys.modules: 

695 exceptions.add(sys.modules["nose"].SkipTest) 

696 if "_pytest" in sys.modules: 

697 exceptions.add(sys.modules["_pytest"].outcomes.Skipped) 

698 return tuple(sorted(exceptions, key=str)) 

699 

700 

701def failure_exceptions_to_catch() -> tuple[type[BaseException], ...]: 

702 """Return a tuple of exceptions meaning 'this test has failed', to catch. 

703 

704 This is intended to cover most common test runners; if you would 

705 like another to be added please open an issue or pull request. 

706 """ 

707 # While SystemExit and GeneratorExit are instances of BaseException, we also 

708 # expect them to be deterministic - unlike KeyboardInterrupt - and so we treat 

709 # them as standard exceptions, check for flakiness, etc. 

710 # See https://github.com/HypothesisWorks/hypothesis/issues/2223 for details. 

711 exceptions = [Exception, SystemExit, GeneratorExit] 

712 if "_pytest" in sys.modules: 

713 exceptions.append(sys.modules["_pytest"].outcomes.Failed) 

714 return tuple(exceptions) 

715 

716 

717def new_given_signature(original_sig, given_kwargs): 

718 """Make an updated signature for the wrapped test.""" 

719 return original_sig.replace( 

720 parameters=[ 

721 p 

722 for p in original_sig.parameters.values() 

723 if not ( 

724 p.name in given_kwargs 

725 and p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY) 

726 ) 

727 ], 

728 return_annotation=None, 

729 ) 

730 

731 

732def default_executor(data, function): 

733 return function(data) 

734 

735 

736def get_executor(runner): 

737 try: 

738 execute_example = runner.execute_example 

739 except AttributeError: 

740 pass 

741 else: 

742 return lambda data, function: execute_example(partial(function, data)) 

743 

744 if hasattr(runner, "setup_example") or hasattr(runner, "teardown_example"): 

745 setup = getattr(runner, "setup_example", None) or (lambda: None) 

746 teardown = getattr(runner, "teardown_example", None) or (lambda ex: None) 

747 

748 def execute(data, function): 

749 token = None 

750 try: 

751 token = setup() 

752 return function(data) 

753 finally: 

754 teardown(token) 

755 

756 return execute 

757 

758 return default_executor 

759 

760 

761@contextlib.contextmanager 

762def unwrap_markers_from_group() -> Generator[None, None, None]: 

763 # This function is a crude solution, a better way of resolving it would probably 

764 # be to rewrite a bunch of exception handlers to use except*. 

765 T = TypeVar("T", bound=BaseException) 

766 

767 def _flatten_group(excgroup: BaseExceptionGroup[T]) -> list[T]: 

768 found_exceptions: list[T] = [] 

769 for exc in excgroup.exceptions: 

770 if isinstance(exc, BaseExceptionGroup): 

771 found_exceptions.extend(_flatten_group(exc)) 

772 else: 

773 found_exceptions.append(exc) 

774 return found_exceptions 

775 

776 try: 

777 yield 

778 except BaseExceptionGroup as excgroup: 

779 frozen_exceptions, non_frozen_exceptions = excgroup.split(Frozen) 

780 

781 # group only contains Frozen, reraise the group 

782 # it doesn't matter what we raise, since any exceptions get disregarded 

783 # and reraised as StopTest if data got frozen. 

784 if non_frozen_exceptions is None: 

785 raise 

786 # in all other cases they are discarded 

787 

788 # Can RewindRecursive end up in this group? 

789 _, user_exceptions = non_frozen_exceptions.split( 

790 lambda e: isinstance(e, (StopTest, HypothesisException)) 

791 ) 

792 

793 # this might contain marker exceptions, or internal errors, but not frozen. 

794 if user_exceptions is not None: 

795 raise 

796 

797 # single marker exception - reraise it 

798 flattened_non_frozen_exceptions: list[BaseException] = _flatten_group( 

799 non_frozen_exceptions 

800 ) 

801 if len(flattened_non_frozen_exceptions) == 1: 

802 e = flattened_non_frozen_exceptions[0] 

803 # preserve the cause of the original exception to not hinder debugging 

804 # note that __context__ is still lost though 

805 raise e from e.__cause__ 

806 

807 # multiple marker exceptions. If we re-raise the whole group we break 

808 # a bunch of logic so ....? 

809 stoptests, non_stoptests = non_frozen_exceptions.split(StopTest) 

810 

811 # TODO: stoptest+hypothesisexception ...? Is it possible? If so, what do? 

812 

813 if non_stoptests: 

814 # TODO: multiple marker exceptions is easy to produce, but the logic in the 

815 # engine does not handle it... so we just reraise the first one for now. 

816 e = _flatten_group(non_stoptests)[0] 

817 raise e from e.__cause__ 

818 assert stoptests is not None 

819 

820 # multiple stoptests: raising the one with the lowest testcounter 

821 raise min(_flatten_group(stoptests), key=lambda s_e: s_e.testcounter) 

822 

823 

824class StateForActualGivenExecution: 

825 def __init__(self, stuff, test, settings, random, wrapped_test): 

826 self.test_runner = get_executor(stuff.selfy) 

827 self.stuff = stuff 

828 self.settings = settings 

829 self.last_exception = None 

830 self.falsifying_examples = () 

831 self.random = random 

832 self.ever_executed = False 

833 

834 self.is_find = getattr(wrapped_test, "_hypothesis_internal_is_find", False) 

835 self.wrapped_test = wrapped_test 

836 self.xfail_example_reprs = set() 

837 

838 self.test = test 

839 

840 self.print_given_args = getattr( 

841 wrapped_test, "_hypothesis_internal_print_given_args", True 

842 ) 

843 

844 self.files_to_propagate = set() 

845 self.failed_normally = False 

846 self.failed_due_to_deadline = False 

847 

848 self.explain_traces = defaultdict(set) 

849 self._start_timestamp = time.time() 

850 self._string_repr = "" 

851 self._timing_features = {} 

852 

853 @property 

854 def test_identifier(self): 

855 return getattr( 

856 current_pytest_item.value, "nodeid", None 

857 ) or get_pretty_function_description(self.wrapped_test) 

858 

859 def _should_trace(self): 

860 _trace_obs = TESTCASE_CALLBACKS and OBSERVABILITY_COLLECT_COVERAGE 

861 _trace_failure = ( 

862 self.failed_normally 

863 and not self.failed_due_to_deadline 

864 and {Phase.shrink, Phase.explain}.issubset(self.settings.phases) 

865 ) 

866 return _trace_obs or _trace_failure 

867 

868 def execute_once( 

869 self, 

870 data, 

871 *, 

872 print_example=False, 

873 is_final=False, 

874 expected_failure=None, 

875 example_kwargs=None, 

876 ): 

877 """Run the test function once, using ``data`` as input. 

878 

879 If the test raises an exception, it will propagate through to the 

880 caller of this method. Depending on its type, this could represent 

881 an ordinary test failure, or a fatal error, or a control exception. 

882 

883 If this method returns normally, the test might have passed, or 

884 it might have placed ``data`` in an unsuccessful state and then 

885 swallowed the corresponding control exception. 

886 """ 

887 

888 self.ever_executed = True 

889 data.is_find = self.is_find 

890 

891 self._string_repr = "" 

892 text_repr = None 

893 if self.settings.deadline is None and not TESTCASE_CALLBACKS: 

894 

895 @proxies(self.test) 

896 def test(*args, **kwargs): 

897 with unwrap_markers_from_group(), ensure_free_stackframes(): 

898 return self.test(*args, **kwargs) 

899 

900 else: 

901 

902 @proxies(self.test) 

903 def test(*args, **kwargs): 

904 arg_drawtime = math.fsum(data.draw_times.values()) 

905 arg_stateful = math.fsum(data._stateful_run_times.values()) 

906 arg_gctime = gc_cumulative_time() 

907 start = time.perf_counter() 

908 try: 

909 with unwrap_markers_from_group(), ensure_free_stackframes(): 

910 result = self.test(*args, **kwargs) 

911 finally: 

912 finish = time.perf_counter() 

913 in_drawtime = math.fsum(data.draw_times.values()) - arg_drawtime 

914 in_stateful = ( 

915 math.fsum(data._stateful_run_times.values()) - arg_stateful 

916 ) 

917 in_gctime = gc_cumulative_time() - arg_gctime 

918 runtime = finish - start - in_drawtime - in_stateful - in_gctime 

919 self._timing_features = { 

920 "execute:test": runtime, 

921 "overall:gc": in_gctime, 

922 **data.draw_times, 

923 **data._stateful_run_times, 

924 } 

925 

926 if (current_deadline := self.settings.deadline) is not None: 

927 if not is_final: 

928 current_deadline = (current_deadline // 4) * 5 

929 if runtime >= current_deadline.total_seconds(): 

930 raise DeadlineExceeded( 

931 datetime.timedelta(seconds=runtime), self.settings.deadline 

932 ) 

933 return result 

934 

935 def run(data): 

936 # Set up dynamic context needed by a single test run. 

937 if self.stuff.selfy is not None: 

938 data.hypothesis_runner = self.stuff.selfy 

939 # Generate all arguments to the test function. 

940 args = self.stuff.args 

941 kwargs = dict(self.stuff.kwargs) 

942 if example_kwargs is None: 

943 kw, argslices = context.prep_args_kwargs_from_strategies( 

944 self.stuff.given_kwargs 

945 ) 

946 else: 

947 kw = example_kwargs 

948 argslices = {} 

949 kwargs.update(kw) 

950 if expected_failure is not None: 

951 nonlocal text_repr 

952 text_repr = repr_call(test, args, kwargs) 

953 

954 if print_example or current_verbosity() >= Verbosity.verbose: 

955 printer = RepresentationPrinter(context=context) 

956 if print_example: 

957 printer.text("Falsifying example:") 

958 else: 

959 printer.text("Trying example:") 

960 

961 if self.print_given_args: 

962 printer.text(" ") 

963 printer.repr_call( 

964 test.__name__, 

965 args, 

966 kwargs, 

967 force_split=True, 

968 arg_slices=argslices, 

969 leading_comment=( 

970 "# " + context.data.slice_comments[(0, 0)] 

971 if (0, 0) in context.data.slice_comments 

972 else None 

973 ), 

974 avoid_realization=data.provider.avoid_realization, 

975 ) 

976 report(printer.getvalue()) 

977 

978 if TESTCASE_CALLBACKS: 

979 printer = RepresentationPrinter(context=context) 

980 printer.repr_call( 

981 test.__name__, 

982 args, 

983 kwargs, 

984 force_split=True, 

985 arg_slices=argslices, 

986 leading_comment=( 

987 "# " + context.data.slice_comments[(0, 0)] 

988 if (0, 0) in context.data.slice_comments 

989 else None 

990 ), 

991 avoid_realization=data.provider.avoid_realization, 

992 ) 

993 self._string_repr = printer.getvalue() 

994 data._observability_arguments = { 

995 k: to_jsonable(v, avoid_realization=data.provider.avoid_realization) 

996 for k, v in [*enumerate(args), *kwargs.items()] 

997 } 

998 

999 try: 

1000 return test(*args, **kwargs) 

1001 except TypeError as e: 

1002 # If we sampled from a sequence of strategies, AND failed with a 

1003 # TypeError, *AND that exception mentions SearchStrategy*, add a note: 

1004 if ( 

1005 "SearchStrategy" in str(e) 

1006 and data._sampled_from_all_strategies_elements_message is not None 

1007 ): 

1008 msg, format_arg = data._sampled_from_all_strategies_elements_message 

1009 add_note(e, msg.format(format_arg)) 

1010 raise 

1011 finally: 

1012 if parts := getattr(data, "_stateful_repr_parts", None): 

1013 self._string_repr = "\n".join(parts) 

1014 

1015 # self.test_runner can include the execute_example method, or setup/teardown 

1016 # _example, so it's important to get the PRNG and build context in place first. 

1017 with ( 

1018 local_settings(self.settings), 

1019 deterministic_PRNG(), 

1020 BuildContext(data, is_final=is_final) as context, 

1021 ): 

1022 # providers may throw in per_case_context_fn, and we'd like 

1023 # `result` to still be set in these cases. 

1024 result = None 

1025 with data.provider.per_test_case_context_manager(): 

1026 # Run the test function once, via the executor hook. 

1027 # In most cases this will delegate straight to `run(data)`. 

1028 result = self.test_runner(data, run) 

1029 

1030 # If a failure was expected, it should have been raised already, so 

1031 # instead raise an appropriate diagnostic error. 

1032 if expected_failure is not None: 

1033 exception, traceback = expected_failure 

1034 if isinstance(exception, DeadlineExceeded) and ( 

1035 runtime_secs := math.fsum( 

1036 v 

1037 for k, v in self._timing_features.items() 

1038 if k.startswith("execute:") 

1039 ) 

1040 ): 

1041 report( 

1042 "Unreliable test timings! On an initial run, this " 

1043 f"test took {exception.runtime.total_seconds() * 1000:.2f}ms, " 

1044 "which exceeded the deadline of " 

1045 f"{self.settings.deadline.total_seconds() * 1000:.2f}ms, but " 

1046 f"on a subsequent run it took {runtime_secs * 1000:.2f} ms, " 

1047 "which did not. If you expect this sort of " 

1048 "variability in your test timings, consider turning " 

1049 "deadlines off for this test by setting deadline=None." 

1050 ) 

1051 else: 

1052 report("Failed to reproduce exception. Expected: \n" + traceback) 

1053 raise FlakyFailure( 

1054 f"Hypothesis {text_repr} produces unreliable results: " 

1055 "Falsified on the first call but did not on a subsequent one", 

1056 [exception], 

1057 ) 

1058 return result 

1059 

1060 def _flaky_replay_to_failure( 

1061 self, err: FlakyReplay, context: BaseException 

1062 ) -> FlakyFailure: 

1063 # Note that in the mark_interesting case, _context_ itself 

1064 # is part of err._interesting_examples - but it's not in 

1065 # _runner.interesting_examples - this is fine, as the context 

1066 # (i.e., immediate exception) is appended. 

1067 interesting_examples = [ 

1068 self._runner.interesting_examples[io] 

1069 for io in err._interesting_origins 

1070 if io in self._runner.interesting_examples 

1071 ] 

1072 exceptions = [ie.expected_exception for ie in interesting_examples] 

1073 exceptions.append(context) # the immediate exception 

1074 return FlakyFailure(err.reason, exceptions) 

1075 

1076 def _execute_once_for_engine(self, data: ConjectureData) -> None: 

1077 """Wrapper around ``execute_once`` that intercepts test failure 

1078 exceptions and single-test control exceptions, and turns them into 

1079 appropriate method calls to `data` instead. 

1080 

1081 This allows the engine to assume that any exception other than 

1082 ``StopTest`` must be a fatal error, and should stop the entire engine. 

1083 """ 

1084 trace: Trace = set() 

1085 try: 

1086 with Tracer(should_trace=self._should_trace()) as tracer: 

1087 try: 

1088 result = self.execute_once(data) 

1089 if ( 

1090 data.status == Status.VALID and tracer.branches 

1091 ): # pragma: no cover 

1092 # This is in fact covered by our *non-coverage* tests, but due 

1093 # to the settrace() contention *not* by our coverage tests. 

1094 self.explain_traces[None].add(frozenset(tracer.branches)) 

1095 finally: 

1096 trace = tracer.branches 

1097 if result is not None: 

1098 fail_health_check( 

1099 self.settings, 

1100 "Tests run under @given should return None, but " 

1101 f"{self.test.__name__} returned {result!r} instead.", 

1102 HealthCheck.return_value, 

1103 ) 

1104 except UnsatisfiedAssumption as e: 

1105 # An "assume" check failed, so instead we inform the engine that 

1106 # this test run was invalid. 

1107 try: 

1108 data.mark_invalid(e.reason) 

1109 except FlakyReplay as err: 

1110 # This was unexpected, meaning that the assume was flaky. 

1111 # Report it as such. 

1112 raise self._flaky_replay_to_failure(err, e) from None 

1113 except (StopTest, BackendCannotProceed): 

1114 # The engine knows how to handle this control exception, so it's 

1115 # OK to re-raise it. 

1116 raise 

1117 except ( 

1118 FailedHealthCheck, 

1119 *skip_exceptions_to_reraise(), 

1120 ): 

1121 # These are fatal errors or control exceptions that should stop the 

1122 # engine, so we re-raise them. 

1123 raise 

1124 except failure_exceptions_to_catch() as e: 

1125 # If an unhandled (i.e., non-Hypothesis) error was raised by 

1126 # Hypothesis-internal code, re-raise it as a fatal error instead 

1127 # of treating it as a test failure. 

1128 if isinstance(e, BaseExceptionGroup) and len(e.exceptions) == 1: 

1129 # When a naked exception is implicitly wrapped in an ExceptionGroup 

1130 # due to a re-raising "except*", the ExceptionGroup is constructed in 

1131 # the caller's stack frame (see #4183). This workaround is specifically 

1132 # for implicit wrapping of naked exceptions by "except*", since explicit 

1133 # raising of ExceptionGroup gets the proper traceback in the first place 

1134 # - there's no need to handle hierarchical groups here, at least if no 

1135 # such implicit wrapping happens inside hypothesis code (we only care 

1136 # about the hypothesis-or-not distinction). 

1137 # 

1138 # 01-25-2025: this was patched to give the correct 

1139 # stacktrace in cpython https://github.com/python/cpython/issues/128799. 

1140 # can remove once python3.11 is EOL. 

1141 tb = e.exceptions[0].__traceback__ or e.__traceback__ 

1142 else: 

1143 tb = e.__traceback__ 

1144 filepath = traceback.extract_tb(tb)[-1][0] 

1145 if is_hypothesis_file(filepath) and not isinstance(e, HypothesisException): 

1146 raise 

1147 

1148 if data.frozen: 

1149 # This can happen if an error occurred in a finally 

1150 # block somewhere, suppressing our original StopTest. 

1151 # We raise a new one here to resume normal operation. 

1152 raise StopTest(data.testcounter) from e 

1153 else: 

1154 # The test failed by raising an exception, so we inform the 

1155 # engine that this test run was interesting. This is the normal 

1156 # path for test runs that fail. 

1157 tb = get_trimmed_traceback() 

1158 data.expected_traceback = format_exception(e, tb) 

1159 data.expected_exception = e 

1160 assert data.expected_traceback is not None # for mypy 

1161 verbose_report(data.expected_traceback) 

1162 

1163 self.failed_normally = True 

1164 

1165 interesting_origin = InterestingOrigin.from_exception(e) 

1166 if trace: # pragma: no cover 

1167 # Trace collection is explicitly disabled under coverage. 

1168 self.explain_traces[interesting_origin].add(frozenset(trace)) 

1169 if interesting_origin[0] == DeadlineExceeded: 

1170 self.failed_due_to_deadline = True 

1171 self.explain_traces.clear() 

1172 try: 

1173 data.mark_interesting(interesting_origin) 

1174 except FlakyReplay as err: 

1175 raise self._flaky_replay_to_failure(err, e) from None 

1176 

1177 finally: 

1178 # Conditional here so we can save some time constructing the payload; in 

1179 # other cases (without coverage) it's cheap enough to do that regardless. 

1180 if TESTCASE_CALLBACKS: 

1181 if runner := getattr(self, "_runner", None): 

1182 phase = runner._current_phase 

1183 else: # pragma: no cover # in case of messing with internals 

1184 if self.failed_normally or self.failed_due_to_deadline: 

1185 phase = "shrink" 

1186 else: 

1187 phase = "unknown" 

1188 backend_desc = f", using backend={self.settings.backend!r}" * ( 

1189 self.settings.backend != "hypothesis" 

1190 and not getattr(runner, "_switch_to_hypothesis_provider", False) 

1191 ) 

1192 try: 

1193 data._observability_args = data.provider.realize( 

1194 data._observability_args 

1195 ) 

1196 self._string_repr = data.provider.realize(self._string_repr) 

1197 except BackendCannotProceed: 

1198 data._observability_args = {} 

1199 self._string_repr = "<backend failed to realize symbolic arguments>" 

1200 

1201 tc = make_testcase( 

1202 start_timestamp=self._start_timestamp, 

1203 test_name_or_nodeid=self.test_identifier, 

1204 data=data, 

1205 how_generated=f"during {phase} phase{backend_desc}", 

1206 string_repr=self._string_repr, 

1207 arguments=data._observability_args, 

1208 timing=self._timing_features, 

1209 coverage=tractable_coverage_report(trace) or None, 

1210 phase=phase, 

1211 backend_metadata=data.provider.observe_test_case(), 

1212 ) 

1213 deliver_json_blob(tc) 

1214 for msg in data.provider.observe_information_messages( 

1215 lifetime="test_case" 

1216 ): 

1217 self._deliver_information_message(**msg) 

1218 self._timing_features = {} 

1219 

1220 def _deliver_information_message( 

1221 self, *, type: str, title: str, content: Union[str, dict] 

1222 ) -> None: 

1223 deliver_json_blob( 

1224 { 

1225 "type": type, 

1226 "run_start": self._start_timestamp, 

1227 "property": self.test_identifier, 

1228 "title": title, 

1229 "content": content, 

1230 } 

1231 ) 

1232 

1233 def run_engine(self): 

1234 """Run the test function many times, on database input and generated 

1235 input, using the Conjecture engine. 

1236 """ 

1237 # Tell pytest to omit the body of this function from tracebacks 

1238 __tracebackhide__ = True 

1239 try: 

1240 database_key = self.wrapped_test._hypothesis_internal_database_key 

1241 except AttributeError: 

1242 if global_force_seed is None: 

1243 database_key = function_digest(self.test) 

1244 else: 

1245 database_key = None 

1246 

1247 runner = self._runner = ConjectureRunner( 

1248 self._execute_once_for_engine, 

1249 settings=self.settings, 

1250 random=self.random, 

1251 database_key=database_key, 

1252 ) 

1253 # Use the Conjecture engine to run the test function many times 

1254 # on different inputs. 

1255 runner.run() 

1256 note_statistics(runner.statistics) 

1257 if TESTCASE_CALLBACKS: 

1258 self._deliver_information_message( 

1259 type="info", 

1260 title="Hypothesis Statistics", 

1261 content=describe_statistics(runner.statistics), 

1262 ) 

1263 for msg in ( 

1264 p if isinstance(p := runner.provider, PrimitiveProvider) else p(None) 

1265 ).observe_information_messages(lifetime="test_function"): 

1266 self._deliver_information_message(**msg) 

1267 

1268 if runner.call_count == 0: 

1269 return 

1270 if runner.interesting_examples: 

1271 self.falsifying_examples = sorted( 

1272 runner.interesting_examples.values(), 

1273 key=lambda d: sort_key(d.nodes), 

1274 reverse=True, 

1275 ) 

1276 else: 

1277 if runner.valid_examples == 0: 

1278 explanations = [] 

1279 # use a somewhat arbitrary cutoff to avoid recommending spurious 

1280 # fixes. 

1281 # eg, a few invalid examples from internal filters when the 

1282 # problem is the user generating large inputs, or a 

1283 # few overruns during internal mutation when the problem is 

1284 # impossible user filters/assumes. 

1285 if runner.invalid_examples > min(20, runner.call_count // 5): 

1286 explanations.append( 

1287 f"{runner.invalid_examples} of {runner.call_count} " 

1288 "examples failed a .filter() or assume() condition. Try " 

1289 "making your filters or assumes less strict, or rewrite " 

1290 "using strategy parameters: " 

1291 "st.integers().filter(lambda x: x > 0) fails less often " 

1292 "(that is, never) when rewritten as st.integers(min_value=1)." 

1293 ) 

1294 if runner.overrun_examples > min(20, runner.call_count // 5): 

1295 explanations.append( 

1296 f"{runner.overrun_examples} of {runner.call_count} " 

1297 "examples were too large to finish generating; try " 

1298 "reducing the typical size of your inputs?" 

1299 ) 

1300 rep = get_pretty_function_description(self.test) 

1301 raise Unsatisfiable( 

1302 f"Unable to satisfy assumptions of {rep}. " 

1303 f"{' Also, '.join(explanations)}" 

1304 ) 

1305 

1306 # If we have not traced executions, warn about that now (but only when 

1307 # we'd expect to do so reliably, i.e. on CPython>=3.12) 

1308 if ( 

1309 sys.version_info[:2] >= (3, 12) 

1310 and not PYPY 

1311 and self._should_trace() 

1312 and not Tracer.can_trace() 

1313 ): # pragma: no cover 

1314 # actually covered by our tests, but only on >= 3.12 

1315 warnings.warn( 

1316 "avoiding tracing test function because tool id " 

1317 f"{MONITORING_TOOL_ID} is already taken by tool " 

1318 f"{sys.monitoring.get_tool(MONITORING_TOOL_ID)}.", 

1319 HypothesisWarning, 

1320 stacklevel=3, 

1321 ) 

1322 

1323 if not self.falsifying_examples: 

1324 return 

1325 elif not (self.settings.report_multiple_bugs and pytest_shows_exceptiongroups): 

1326 # Pretend that we only found one failure, by discarding the others. 

1327 del self.falsifying_examples[:-1] 

1328 

1329 # The engine found one or more failures, so we need to reproduce and 

1330 # report them. 

1331 

1332 errors_to_report = [] 

1333 

1334 report_lines = describe_targets(runner.best_observed_targets) 

1335 if report_lines: 

1336 report_lines.append("") 

1337 

1338 explanations = explanatory_lines(self.explain_traces, self.settings) 

1339 for falsifying_example in self.falsifying_examples: 

1340 fragments = [] 

1341 

1342 ran_example = runner.new_conjecture_data( 

1343 falsifying_example.choices, max_choices=len(falsifying_example.choices) 

1344 ) 

1345 ran_example.slice_comments = falsifying_example.slice_comments 

1346 tb = None 

1347 origin = None 

1348 assert falsifying_example.expected_exception is not None 

1349 assert falsifying_example.expected_traceback is not None 

1350 try: 

1351 with with_reporter(fragments.append): 

1352 self.execute_once( 

1353 ran_example, 

1354 print_example=not self.is_find, 

1355 is_final=True, 

1356 expected_failure=( 

1357 falsifying_example.expected_exception, 

1358 falsifying_example.expected_traceback, 

1359 ), 

1360 ) 

1361 except StopTest as e: 

1362 # Link the expected exception from the first run. Not sure 

1363 # how to access the current exception, if it failed 

1364 # differently on this run. In fact, in the only known 

1365 # reproducer, the StopTest is caused by OVERRUN before the 

1366 # test is even executed. Possibly because all initial examples 

1367 # failed until the final non-traced replay, and something was 

1368 # exhausted? Possibly a FIXME, but sufficiently weird to 

1369 # ignore for now. 

1370 err = FlakyFailure( 

1371 "Inconsistent results: An example failed on the " 

1372 "first run but now succeeds (or fails with another " 

1373 "error, or is for some reason not runnable).", 

1374 # (note: e is a BaseException) 

1375 [falsifying_example.expected_exception or e], 

1376 ) 

1377 errors_to_report.append((fragments, err)) 

1378 except UnsatisfiedAssumption as e: # pragma: no cover # ironically flaky 

1379 err = FlakyFailure( 

1380 "Unreliable assumption: An example which satisfied " 

1381 "assumptions on the first run now fails it.", 

1382 [e], 

1383 ) 

1384 errors_to_report.append((fragments, err)) 

1385 except BaseException as e: 

1386 # If we have anything for explain-mode, this is the time to report. 

1387 fragments.extend(explanations[falsifying_example.interesting_origin]) 

1388 errors_to_report.append( 

1389 (fragments, e.with_traceback(get_trimmed_traceback())) 

1390 ) 

1391 tb = format_exception(e, get_trimmed_traceback(e)) 

1392 origin = InterestingOrigin.from_exception(e) 

1393 else: 

1394 # execute_once() will always raise either the expected error, or Flaky. 

1395 raise NotImplementedError("This should be unreachable") 

1396 finally: 

1397 # log our observability line for the final failing example 

1398 tc = { 

1399 "type": "test_case", 

1400 "run_start": self._start_timestamp, 

1401 "property": self.test_identifier, 

1402 "status": "passed" if sys.exc_info()[0] else "failed", 

1403 "status_reason": str(origin or "unexpected/flaky pass"), 

1404 "representation": self._string_repr, 

1405 "arguments": ran_example._observability_args, 

1406 "how_generated": "minimal failing example", 

1407 "features": { 

1408 **{ 

1409 f"target:{k}".strip(":"): v 

1410 for k, v in ran_example.target_observations.items() 

1411 }, 

1412 **ran_example.events, 

1413 }, 

1414 "timing": self._timing_features, 

1415 "coverage": None, # Not recorded when we're replaying the MFE 

1416 "metadata": { 

1417 "traceback": tb, 

1418 "predicates": dict(ran_example._observability_predicates), 

1419 **_system_metadata(), 

1420 }, 

1421 } 

1422 deliver_json_blob(tc) 

1423 # Whether or not replay actually raised the exception again, we want 

1424 # to print the reproduce_failure decorator for the failing example. 

1425 if self.settings.print_blob: 

1426 fragments.append( 

1427 "\nYou can reproduce this example by temporarily adding " 

1428 f"{reproduction_decorator(falsifying_example.choices)} " 

1429 "as a decorator on your test case" 

1430 ) 

1431 # Mostly useful for ``find`` and ensuring that objects that 

1432 # hold on to a reference to ``data`` know that it's now been 

1433 # finished and they can't draw more data from it. 

1434 ran_example.freeze() # pragma: no branch 

1435 # No branch is possible here because we never have an active exception. 

1436 _raise_to_user( 

1437 errors_to_report, 

1438 self.settings, 

1439 report_lines, 

1440 # A backend might report a failure and then report verified afterwards, 

1441 # which is to be interpreted as "there are no more failures *other 

1442 # than what we already reported*". Do not report this as unsound. 

1443 unsound_backend=( 

1444 runner._verified_by 

1445 if runner._verified_by and not runner._backend_found_failure 

1446 else None 

1447 ), 

1448 ) 

1449 

1450 

1451def _raise_to_user( 

1452 errors_to_report, settings, target_lines, trailer="", *, unsound_backend=None 

1453): 

1454 """Helper function for attaching notes and grouping multiple errors.""" 

1455 failing_prefix = "Falsifying example: " 

1456 ls = [] 

1457 for fragments, err in errors_to_report: 

1458 for note in fragments: 

1459 add_note(err, note) 

1460 if note.startswith(failing_prefix): 

1461 ls.append(note.removeprefix(failing_prefix)) 

1462 if current_pytest_item.value: 

1463 current_pytest_item.value._hypothesis_failing_examples = ls 

1464 

1465 if len(errors_to_report) == 1: 

1466 _, the_error_hypothesis_found = errors_to_report[0] 

1467 else: 

1468 assert errors_to_report 

1469 the_error_hypothesis_found = BaseExceptionGroup( 

1470 f"Hypothesis found {len(errors_to_report)} distinct failures{trailer}.", 

1471 [e for _, e in errors_to_report], 

1472 ) 

1473 

1474 if settings.verbosity >= Verbosity.normal: 

1475 for line in target_lines: 

1476 add_note(the_error_hypothesis_found, line) 

1477 

1478 if unsound_backend: 

1479 msg = f"backend={unsound_backend!r} claimed to verify this test passes - please send them a bug report!" 

1480 add_note(err, msg) 

1481 

1482 raise the_error_hypothesis_found 

1483 

1484 

1485@contextlib.contextmanager 

1486def fake_subTest(self, msg=None, **__): 

1487 """Monkeypatch for `unittest.TestCase.subTest` during `@given`. 

1488 

1489 If we don't patch this out, each failing example is reported as a 

1490 separate failing test by the unittest test runner, which is 

1491 obviously incorrect. We therefore replace it for the duration with 

1492 this version. 

1493 """ 

1494 warnings.warn( 

1495 "subTest per-example reporting interacts badly with Hypothesis " 

1496 "trying hundreds of examples, so we disable it for the duration of " 

1497 "any test that uses `@given`.", 

1498 HypothesisWarning, 

1499 stacklevel=2, 

1500 ) 

1501 yield 

1502 

1503 

1504@dataclass 

1505class HypothesisHandle: 

1506 """This object is provided as the .hypothesis attribute on @given tests. 

1507 

1508 Downstream users can reassign its attributes to insert custom logic into 

1509 the execution of each case, for example by converting an async into a 

1510 sync function. 

1511 

1512 This must be an attribute of an attribute, because reassignment of a 

1513 first-level attribute would not be visible to Hypothesis if the function 

1514 had been decorated before the assignment. 

1515 

1516 See https://github.com/HypothesisWorks/hypothesis/issues/1257 for more 

1517 information. 

1518 """ 

1519 

1520 inner_test: Any 

1521 _get_fuzz_target: Any 

1522 _given_kwargs: Any 

1523 

1524 @property 

1525 def fuzz_one_input( 

1526 self, 

1527 ) -> Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]]: 

1528 """Run the test as a fuzz target, driven with the `buffer` of bytes. 

1529 

1530 Returns None if buffer invalid for the strategy, canonical pruned 

1531 bytes if the buffer was valid, and leaves raised exceptions alone. 

1532 """ 

1533 # Note: most users, if they care about fuzzer performance, will access the 

1534 # property and assign it to a local variable to move the attribute lookup 

1535 # outside their fuzzing loop / before the fork point. We cache it anyway, 

1536 # so that naive or unusual use-cases get the best possible performance too. 

1537 try: 

1538 return self.__cached_target # type: ignore 

1539 except AttributeError: 

1540 self.__cached_target = self._get_fuzz_target() 

1541 return self.__cached_target 

1542 

1543 

1544@overload 

1545def given( 

1546 _: EllipsisType, / 

1547) -> Callable[ 

1548 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[[], None] 

1549]: # pragma: no cover 

1550 ... 

1551 

1552 

1553@overload 

1554def given( 

1555 *_given_arguments: SearchStrategy[Any], 

1556) -> Callable[ 

1557 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None] 

1558]: # pragma: no cover 

1559 ... 

1560 

1561 

1562@overload 

1563def given( 

1564 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType], 

1565) -> Callable[ 

1566 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None] 

1567]: # pragma: no cover 

1568 ... 

1569 

1570 

1571def given( 

1572 *_given_arguments: Union[SearchStrategy[Any], EllipsisType], 

1573 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType], 

1574) -> Callable[ 

1575 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None] 

1576]: 

1577 """A decorator for turning a test function that accepts arguments into a 

1578 randomized test. 

1579 

1580 This is the main entry point to Hypothesis. 

1581 """ 

1582 

1583 if currently_in_test_context(): 

1584 fail_health_check( 

1585 Settings(), 

1586 "Nesting @given tests results in quadratic generation and shrinking " 

1587 "behavior and can usually be more cleanly expressed by replacing the " 

1588 "inner function with an st.data() parameter on the outer @given.", 

1589 HealthCheck.nested_given, 

1590 ) 

1591 

1592 def run_test_as_given(test): 

1593 if inspect.isclass(test): 

1594 # Provide a meaningful error to users, instead of exceptions from 

1595 # internals that assume we're dealing with a function. 

1596 raise InvalidArgument("@given cannot be applied to a class.") 

1597 given_arguments = tuple(_given_arguments) 

1598 given_kwargs = dict(_given_kwargs) 

1599 

1600 original_sig = get_signature(test) 

1601 if given_arguments == (Ellipsis,) and not given_kwargs: 

1602 # user indicated that they want to infer all arguments 

1603 given_kwargs = { 

1604 p.name: Ellipsis 

1605 for p in original_sig.parameters.values() 

1606 if p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY) 

1607 } 

1608 given_arguments = () 

1609 

1610 check_invalid = is_invalid_test( 

1611 test, original_sig, given_arguments, given_kwargs 

1612 ) 

1613 

1614 # If the argument check found problems, return a dummy test function 

1615 # that will raise an error if it is actually called. 

1616 if check_invalid is not None: 

1617 return check_invalid 

1618 

1619 # Because the argument check succeeded, we can convert @given's 

1620 # positional arguments into keyword arguments for simplicity. 

1621 if given_arguments: 

1622 assert not given_kwargs 

1623 posargs = [ 

1624 p.name 

1625 for p in original_sig.parameters.values() 

1626 if p.kind is p.POSITIONAL_OR_KEYWORD 

1627 ] 

1628 given_kwargs = dict(list(zip(posargs[::-1], given_arguments[::-1]))[::-1]) 

1629 # These have been converted, so delete them to prevent accidental use. 

1630 del given_arguments 

1631 

1632 new_signature = new_given_signature(original_sig, given_kwargs) 

1633 

1634 # Use type information to convert "infer" arguments into appropriate strategies. 

1635 if ... in given_kwargs.values(): 

1636 hints = get_type_hints(test) 

1637 for name in [name for name, value in given_kwargs.items() if value is ...]: 

1638 if name not in hints: 

1639 return _invalid( 

1640 f"passed {name}=... for {test.__name__}, but {name} has " 

1641 "no type annotation", 

1642 test=test, 

1643 given_kwargs=given_kwargs, 

1644 ) 

1645 given_kwargs[name] = st.from_type(hints[name]) 

1646 

1647 prev_self = Unset = object() 

1648 

1649 @impersonate(test) 

1650 @define_function_signature(test.__name__, test.__doc__, new_signature) 

1651 def wrapped_test(*arguments, **kwargs): 

1652 # Tell pytest to omit the body of this function from tracebacks 

1653 __tracebackhide__ = True 

1654 

1655 test = wrapped_test.hypothesis.inner_test 

1656 

1657 if getattr(test, "is_hypothesis_test", False): 

1658 raise InvalidArgument( 

1659 f"You have applied @given to the test {test.__name__} more than " 

1660 "once, which wraps the test several times and is extremely slow. " 

1661 "A similar effect can be gained by combining the arguments " 

1662 "of the two calls to given. For example, instead of " 

1663 "@given(booleans()) @given(integers()), you could write " 

1664 "@given(booleans(), integers())" 

1665 ) 

1666 

1667 settings = wrapped_test._hypothesis_internal_use_settings 

1668 

1669 random = get_random_for_wrapped_test(test, wrapped_test) 

1670 

1671 arguments, kwargs, stuff = process_arguments_to_given( 

1672 wrapped_test, arguments, kwargs, given_kwargs, new_signature.parameters 

1673 ) 

1674 

1675 if ( 

1676 inspect.iscoroutinefunction(test) 

1677 and get_executor(stuff.selfy) is default_executor 

1678 ): 

1679 # See https://github.com/HypothesisWorks/hypothesis/issues/3054 

1680 # If our custom executor doesn't handle coroutines, or we return an 

1681 # awaitable from a non-async-def function, we just rely on the 

1682 # return_value health check. This catches most user errors though. 

1683 raise InvalidArgument( 

1684 "Hypothesis doesn't know how to run async test functions like " 

1685 f"{test.__name__}. You'll need to write a custom executor, " 

1686 "or use a library like pytest-asyncio or pytest-trio which can " 

1687 "handle the translation for you.\n See https://hypothesis." 

1688 "readthedocs.io/en/latest/details.html#custom-function-execution" 

1689 ) 

1690 

1691 runner = stuff.selfy 

1692 if isinstance(stuff.selfy, TestCase) and test.__name__ in dir(TestCase): 

1693 msg = ( 

1694 f"You have applied @given to the method {test.__name__}, which is " 

1695 "used by the unittest runner but is not itself a test." 

1696 " This is not useful in any way." 

1697 ) 

1698 fail_health_check(settings, msg, HealthCheck.not_a_test_method) 

1699 if bad_django_TestCase(runner): # pragma: no cover 

1700 # Covered by the Django tests, but not the pytest coverage task 

1701 raise InvalidArgument( 

1702 "You have applied @given to a method on " 

1703 f"{type(runner).__qualname__}, but this " 

1704 "class does not inherit from the supported versions in " 

1705 "`hypothesis.extra.django`. Use the Hypothesis variants " 

1706 "to ensure that each example is run in a separate " 

1707 "database transaction." 

1708 ) 

1709 

1710 nonlocal prev_self 

1711 # Check selfy really is self (not e.g. a mock) before we health-check 

1712 cur_self = ( 

1713 stuff.selfy 

1714 if getattr(type(stuff.selfy), test.__name__, None) is wrapped_test 

1715 else None 

1716 ) 

1717 if prev_self is Unset: 

1718 prev_self = cur_self 

1719 elif cur_self is not prev_self: 

1720 msg = ( 

1721 f"The method {test.__qualname__} was called from multiple " 

1722 "different executors. This may lead to flaky tests and " 

1723 "nonreproducible errors when replaying from database." 

1724 ) 

1725 fail_health_check(settings, msg, HealthCheck.differing_executors) 

1726 

1727 state = StateForActualGivenExecution( 

1728 stuff, test, settings, random, wrapped_test 

1729 ) 

1730 

1731 reproduce_failure = wrapped_test._hypothesis_internal_use_reproduce_failure 

1732 

1733 # If there was a @reproduce_failure decorator, use it to reproduce 

1734 # the error (or complain that we couldn't). Either way, this will 

1735 # always raise some kind of error. 

1736 if reproduce_failure is not None: 

1737 expected_version, failure = reproduce_failure 

1738 if expected_version != __version__: 

1739 raise InvalidArgument( 

1740 "Attempting to reproduce a failure from a different " 

1741 f"version of Hypothesis. This failure is from {expected_version}, but " 

1742 f"you are currently running {__version__!r}. Please change your " 

1743 "Hypothesis version to a matching one." 

1744 ) 

1745 try: 

1746 state.execute_once( 

1747 ConjectureData.for_choices(decode_failure(failure)), 

1748 print_example=True, 

1749 is_final=True, 

1750 ) 

1751 raise DidNotReproduce( 

1752 "Expected the test to raise an error, but it " 

1753 "completed successfully." 

1754 ) 

1755 except StopTest: 

1756 raise DidNotReproduce( 

1757 "The shape of the test data has changed in some way " 

1758 "from where this blob was defined. Are you sure " 

1759 "you're running the same test?" 

1760 ) from None 

1761 except UnsatisfiedAssumption: 

1762 raise DidNotReproduce( 

1763 "The test data failed to satisfy an assumption in the " 

1764 "test. Have you added it since this blob was generated?" 

1765 ) from None 

1766 

1767 # There was no @reproduce_failure, so start by running any explicit 

1768 # examples from @example decorators. 

1769 errors = list( 

1770 execute_explicit_examples( 

1771 state, wrapped_test, arguments, kwargs, original_sig 

1772 ) 

1773 ) 

1774 if errors: 

1775 # If we're not going to report multiple bugs, we would have 

1776 # stopped running explicit examples at the first failure. 

1777 assert len(errors) == 1 or state.settings.report_multiple_bugs 

1778 

1779 # If an explicit example raised a 'skip' exception, ensure it's never 

1780 # wrapped up in an exception group. Because we break out of the loop 

1781 # immediately on finding a skip, if present it's always the last error. 

1782 if isinstance(errors[-1][1], skip_exceptions_to_reraise()): 

1783 # Covered by `test_issue_3453_regression`, just in a subprocess. 

1784 del errors[:-1] # pragma: no cover 

1785 

1786 _raise_to_user(errors, state.settings, [], " in explicit examples") 

1787 

1788 # If there were any explicit examples, they all ran successfully. 

1789 # The next step is to use the Conjecture engine to run the test on 

1790 # many different inputs. 

1791 

1792 ran_explicit_examples = Phase.explicit in state.settings.phases and getattr( 

1793 wrapped_test, "hypothesis_explicit_examples", () 

1794 ) 

1795 SKIP_BECAUSE_NO_EXAMPLES = unittest.SkipTest( 

1796 "Hypothesis has been told to run no examples for this test." 

1797 ) 

1798 if not ( 

1799 Phase.reuse in settings.phases or Phase.generate in settings.phases 

1800 ): 

1801 if not ran_explicit_examples: 

1802 raise SKIP_BECAUSE_NO_EXAMPLES 

1803 return 

1804 

1805 try: 

1806 if isinstance(runner, TestCase) and hasattr(runner, "subTest"): 

1807 subTest = runner.subTest 

1808 try: 

1809 runner.subTest = types.MethodType(fake_subTest, runner) 

1810 state.run_engine() 

1811 finally: 

1812 runner.subTest = subTest 

1813 else: 

1814 state.run_engine() 

1815 except BaseException as e: 

1816 # The exception caught here should either be an actual test 

1817 # failure (or BaseExceptionGroup), or some kind of fatal error 

1818 # that caused the engine to stop. 

1819 generated_seed = wrapped_test._hypothesis_internal_use_generated_seed 

1820 with local_settings(settings): 

1821 if not (state.failed_normally or generated_seed is None): 

1822 if running_under_pytest: 

1823 report( 

1824 f"You can add @seed({generated_seed}) to this test or " 

1825 f"run pytest with --hypothesis-seed={generated_seed} " 

1826 "to reproduce this failure." 

1827 ) 

1828 else: 

1829 report( 

1830 f"You can add @seed({generated_seed}) to this test to " 

1831 "reproduce this failure." 

1832 ) 

1833 # The dance here is to avoid showing users long tracebacks 

1834 # full of Hypothesis internals they don't care about. 

1835 # We have to do this inline, to avoid adding another 

1836 # internal stack frame just when we've removed the rest. 

1837 # 

1838 # Using a variable for our trimmed error ensures that the line 

1839 # which will actually appear in tracebacks is as clear as 

1840 # possible - "raise the_error_hypothesis_found". 

1841 the_error_hypothesis_found = e.with_traceback( 

1842 None 

1843 if isinstance(e, BaseExceptionGroup) 

1844 else get_trimmed_traceback() 

1845 ) 

1846 raise the_error_hypothesis_found 

1847 

1848 if not (ran_explicit_examples or state.ever_executed): 

1849 raise SKIP_BECAUSE_NO_EXAMPLES 

1850 

1851 def _get_fuzz_target() -> ( 

1852 Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]] 

1853 ): 

1854 # Because fuzzing interfaces are very performance-sensitive, we use a 

1855 # somewhat more complicated structure here. `_get_fuzz_target()` is 

1856 # called by the `HypothesisHandle.fuzz_one_input` property, allowing 

1857 # us to defer our collection of the settings, random instance, and 

1858 # reassignable `inner_test` (etc) until `fuzz_one_input` is accessed. 

1859 # 

1860 # We then share the performance cost of setting up `state` between 

1861 # many invocations of the target. We explicitly force `deadline=None` 

1862 # for performance reasons, saving ~40% the runtime of an empty test. 

1863 test = wrapped_test.hypothesis.inner_test 

1864 settings = Settings( 

1865 parent=wrapped_test._hypothesis_internal_use_settings, deadline=None 

1866 ) 

1867 random = get_random_for_wrapped_test(test, wrapped_test) 

1868 _args, _kwargs, stuff = process_arguments_to_given( 

1869 wrapped_test, (), {}, given_kwargs, new_signature.parameters 

1870 ) 

1871 assert not _args 

1872 assert not _kwargs 

1873 state = StateForActualGivenExecution( 

1874 stuff, test, settings, random, wrapped_test 

1875 ) 

1876 database_key = function_digest(test) + b".secondary" 

1877 # We track the minimal-so-far example for each distinct origin, so 

1878 # that we track log-n instead of n examples for long runs. In particular 

1879 # it means that we saturate for common errors in long runs instead of 

1880 # storing huge volumes of low-value data. 

1881 minimal_failures: dict = {} 

1882 

1883 def fuzz_one_input( 

1884 buffer: Union[bytes, bytearray, memoryview, BinaryIO], 

1885 ) -> Optional[bytes]: 

1886 # This inner part is all that the fuzzer will actually run, 

1887 # so we keep it as small and as fast as possible. 

1888 if isinstance(buffer, io.IOBase): 

1889 buffer = buffer.read(BUFFER_SIZE) 

1890 assert isinstance(buffer, (bytes, bytearray, memoryview)) 

1891 data = ConjectureData( 

1892 random=None, 

1893 provider=BytestringProvider, 

1894 provider_kw={"bytestring": buffer}, 

1895 ) 

1896 try: 

1897 state.execute_once(data) 

1898 except (StopTest, UnsatisfiedAssumption): 

1899 return None 

1900 except BaseException: 

1901 known = minimal_failures.get(data.interesting_origin) 

1902 if settings.database is not None and ( 

1903 known is None or sort_key(data.nodes) <= sort_key(known) 

1904 ): 

1905 settings.database.save( 

1906 database_key, choices_to_bytes(data.choices) 

1907 ) 

1908 minimal_failures[data.interesting_origin] = data.nodes 

1909 raise 

1910 assert isinstance(data.provider, BytestringProvider) 

1911 return bytes(data.provider.drawn) 

1912 

1913 fuzz_one_input.__doc__ = HypothesisHandle.fuzz_one_input.__doc__ 

1914 return fuzz_one_input 

1915 

1916 # After having created the decorated test function, we need to copy 

1917 # over some attributes to make the switch as seamless as possible. 

1918 

1919 for attrib in dir(test): 

1920 if not (attrib.startswith("_") or hasattr(wrapped_test, attrib)): 

1921 setattr(wrapped_test, attrib, getattr(test, attrib)) 

1922 wrapped_test.is_hypothesis_test = True 

1923 if hasattr(test, "_hypothesis_internal_settings_applied"): 

1924 # Used to check if @settings is applied twice. 

1925 wrapped_test._hypothesis_internal_settings_applied = True 

1926 wrapped_test._hypothesis_internal_use_seed = getattr( 

1927 test, "_hypothesis_internal_use_seed", None 

1928 ) 

1929 wrapped_test._hypothesis_internal_use_settings = ( 

1930 getattr(test, "_hypothesis_internal_use_settings", None) or Settings.default 

1931 ) 

1932 wrapped_test._hypothesis_internal_use_reproduce_failure = getattr( 

1933 test, "_hypothesis_internal_use_reproduce_failure", None 

1934 ) 

1935 wrapped_test.hypothesis = HypothesisHandle(test, _get_fuzz_target, given_kwargs) 

1936 return wrapped_test 

1937 

1938 return run_test_as_given 

1939 

1940 

1941def find( 

1942 specifier: SearchStrategy[Ex], 

1943 condition: Callable[[Any], bool], 

1944 *, 

1945 settings: Optional[Settings] = None, 

1946 random: Optional[Random] = None, 

1947 database_key: Optional[bytes] = None, 

1948) -> Ex: 

1949 """Returns the minimal example from the given strategy ``specifier`` that 

1950 matches the predicate function ``condition``.""" 

1951 if settings is None: 

1952 settings = Settings(max_examples=2000) 

1953 settings = Settings( 

1954 settings, suppress_health_check=list(HealthCheck), report_multiple_bugs=False 

1955 ) 

1956 

1957 if database_key is None and settings.database is not None: 

1958 # Note: The database key is not guaranteed to be unique. If not, replaying 

1959 # of database examples may fail to reproduce due to being replayed on the 

1960 # wrong condition. 

1961 database_key = function_digest(condition) 

1962 

1963 if not isinstance(specifier, SearchStrategy): 

1964 raise InvalidArgument( 

1965 f"Expected SearchStrategy but got {specifier!r} of " 

1966 f"type {type(specifier).__name__}" 

1967 ) 

1968 specifier.validate() 

1969 

1970 last: list[Ex] = [] 

1971 

1972 @settings 

1973 @given(specifier) 

1974 def test(v): 

1975 if condition(v): 

1976 last[:] = [v] 

1977 raise Found 

1978 

1979 if random is not None: 

1980 test = seed(random.getrandbits(64))(test) 

1981 

1982 # Aliasing as Any avoids mypy errors (attr-defined) when accessing and 

1983 # setting custom attributes on the decorated function or class. 

1984 _test: Any = test 

1985 _test._hypothesis_internal_is_find = True 

1986 _test._hypothesis_internal_database_key = database_key 

1987 

1988 try: 

1989 test() 

1990 except Found: 

1991 return last[0] 

1992 

1993 raise NoSuchExample(get_pretty_function_description(condition))