Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/hypothesis/core.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

684 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""This module provides the core primitives of Hypothesis, such as given.""" 

12 

13import base64 

14import contextlib 

15import datetime 

16import inspect 

17import io 

18import math 

19import sys 

20import time 

21import types 

22import unittest 

23import warnings 

24import zlib 

25from collections import defaultdict 

26from functools import partial 

27from random import Random 

28from typing import ( 

29 TYPE_CHECKING, 

30 Any, 

31 BinaryIO, 

32 Callable, 

33 Coroutine, 

34 Hashable, 

35 List, 

36 Optional, 

37 Tuple, 

38 Type, 

39 TypeVar, 

40 Union, 

41 overload, 

42) 

43from unittest import TestCase 

44 

45import attr 

46 

47from hypothesis import strategies as st 

48from hypothesis._settings import ( 

49 HealthCheck, 

50 Phase, 

51 Verbosity, 

52 local_settings, 

53 settings as Settings, 

54) 

55from hypothesis.control import BuildContext 

56from hypothesis.errors import ( 

57 DeadlineExceeded, 

58 DidNotReproduce, 

59 FailedHealthCheck, 

60 Flaky, 

61 Found, 

62 HypothesisDeprecationWarning, 

63 HypothesisWarning, 

64 InvalidArgument, 

65 NoSuchExample, 

66 StopTest, 

67 Unsatisfiable, 

68 UnsatisfiedAssumption, 

69) 

70from hypothesis.internal.compat import ( 

71 PYPY, 

72 BaseExceptionGroup, 

73 add_note, 

74 bad_django_TestCase, 

75 get_type_hints, 

76 int_from_bytes, 

77) 

78from hypothesis.internal.conjecture.data import ConjectureData, Status 

79from hypothesis.internal.conjecture.engine import BUFFER_SIZE, ConjectureRunner 

80from hypothesis.internal.conjecture.junkdrawer import ( 

81 ensure_free_stackframes, 

82 gc_cumulative_time, 

83) 

84from hypothesis.internal.conjecture.shrinker import sort_key 

85from hypothesis.internal.entropy import deterministic_PRNG 

86from hypothesis.internal.escalation import ( 

87 InterestingOrigin, 

88 current_pytest_item, 

89 escalate_hypothesis_internal_error, 

90 format_exception, 

91 get_trimmed_traceback, 

92) 

93from hypothesis.internal.healthcheck import fail_health_check 

94from hypothesis.internal.observability import ( 

95 OBSERVABILITY_COLLECT_COVERAGE, 

96 TESTCASE_CALLBACKS, 

97 _system_metadata, 

98 deliver_json_blob, 

99 make_testcase, 

100) 

101from hypothesis.internal.reflection import ( 

102 convert_positional_arguments, 

103 define_function_signature, 

104 function_digest, 

105 get_pretty_function_description, 

106 get_signature, 

107 impersonate, 

108 is_mock, 

109 nicerepr, 

110 proxies, 

111 repr_call, 

112) 

113from hypothesis.internal.scrutineer import ( 

114 MONITORING_TOOL_ID, 

115 Trace, 

116 Tracer, 

117 explanatory_lines, 

118 tractable_coverage_report, 

119) 

120from hypothesis.internal.validation import check_type 

121from hypothesis.reporting import ( 

122 current_verbosity, 

123 report, 

124 verbose_report, 

125 with_reporter, 

126) 

127from hypothesis.statistics import describe_statistics, describe_targets, note_statistics 

128from hypothesis.strategies._internal.misc import NOTHING 

129from hypothesis.strategies._internal.strategies import ( 

130 Ex, 

131 SearchStrategy, 

132 check_strategy, 

133) 

134from hypothesis.strategies._internal.utils import to_jsonable 

135from hypothesis.vendor.pretty import RepresentationPrinter 

136from hypothesis.version import __version__ 

137 

138if sys.version_info >= (3, 10): 

139 from types import EllipsisType as EllipsisType 

140elif TYPE_CHECKING: 

141 from builtins import ellipsis as EllipsisType 

142else: # pragma: no cover 

143 EllipsisType = type(Ellipsis) 

144 

145 

146TestFunc = TypeVar("TestFunc", bound=Callable) 

147 

148 

149running_under_pytest = False 

150pytest_shows_exceptiongroups = True 

151global_force_seed = None 

152_hypothesis_global_random = None 

153 

154 

155@attr.s() 

156class Example: 

157 args = attr.ib() 

158 kwargs = attr.ib() 

159 # Plus two optional arguments for .xfail() 

160 raises = attr.ib(default=None) 

161 reason = attr.ib(default=None) 

162 

163 

164class example: 

165 """A decorator which ensures a specific example is always tested.""" 

166 

167 def __init__(self, *args: Any, **kwargs: Any) -> None: 

168 if args and kwargs: 

169 raise InvalidArgument( 

170 "Cannot mix positional and keyword arguments for examples" 

171 ) 

172 if not (args or kwargs): 

173 raise InvalidArgument("An example must provide at least one argument") 

174 

175 self.hypothesis_explicit_examples: List[Example] = [] 

176 self._this_example = Example(tuple(args), kwargs) 

177 

178 def __call__(self, test: TestFunc) -> TestFunc: 

179 if not hasattr(test, "hypothesis_explicit_examples"): 

180 test.hypothesis_explicit_examples = self.hypothesis_explicit_examples # type: ignore 

181 test.hypothesis_explicit_examples.append(self._this_example) # type: ignore 

182 return test 

183 

184 def xfail( 

185 self, 

186 condition: bool = True, # noqa: FBT002 

187 *, 

188 reason: str = "", 

189 raises: Union[ 

190 Type[BaseException], Tuple[Type[BaseException], ...] 

191 ] = BaseException, 

192 ) -> "example": 

193 """Mark this example as an expected failure, similarly to 

194 :obj:`pytest.mark.xfail(strict=True) <pytest.mark.xfail>`. 

195 

196 Expected-failing examples allow you to check that your test does fail on 

197 some examples, and therefore build confidence that *passing* tests are 

198 because your code is working, not because the test is missing something. 

199 

200 .. code-block:: python 

201 

202 @example(...).xfail() 

203 @example(...).xfail(reason="Prices must be non-negative") 

204 @example(...).xfail(raises=(KeyError, ValueError)) 

205 @example(...).xfail(sys.version_info[:2] >= (3, 9), reason="needs py39+") 

206 @example(...).xfail(condition=sys.platform != "linux", raises=OSError) 

207 def test(x): 

208 pass 

209 

210 .. note:: 

211 

212 Expected-failing examples are handled separately from those generated 

213 by strategies, so you should usually ensure that there is no overlap. 

214 

215 .. code-block:: python 

216 

217 @example(x=1, y=0).xfail(raises=ZeroDivisionError) 

218 @given(x=st.just(1), y=st.integers()) # Missing `.filter(bool)`! 

219 def test_fraction(x, y): 

220 # This test will try the explicit example and see it fail as 

221 # expected, then go on to generate more examples from the 

222 # strategy. If we happen to generate y=0, the test will fail 

223 # because only the explicit example is treated as xfailing. 

224 x / y 

225 

226 Note that this "method chaining" syntax requires Python 3.9 or later, for 

227 :pep:`614` relaxing grammar restrictions on decorators. If you need to 

228 support older versions of Python, you can use an identity function: 

229 

230 .. code-block:: python 

231 

232 def identity(x): 

233 return x 

234 

235 

236 @identity(example(...).xfail()) 

237 def test(x): 

238 pass 

239 

240 """ 

241 check_type(bool, condition, "condition") 

242 check_type(str, reason, "reason") 

243 if not ( 

244 isinstance(raises, type) and issubclass(raises, BaseException) 

245 ) and not ( 

246 isinstance(raises, tuple) 

247 and raises # () -> expected to fail with no error, which is impossible 

248 and all( 

249 isinstance(r, type) and issubclass(r, BaseException) for r in raises 

250 ) 

251 ): 

252 raise InvalidArgument( 

253 f"{raises=} must be an exception type or tuple of exception types" 

254 ) 

255 if condition: 

256 self._this_example = attr.evolve( 

257 self._this_example, raises=raises, reason=reason 

258 ) 

259 return self 

260 

261 def via(self, whence: str, /) -> "example": 

262 """Attach a machine-readable label noting whence this example came. 

263 

264 The idea is that tools will be able to add ``@example()`` cases for you, e.g. 

265 to maintain a high-coverage set of explicit examples, but also *remove* them 

266 if they become redundant - without ever deleting manually-added examples: 

267 

268 .. code-block:: python 

269 

270 # You can choose to annotate examples, or not, as you prefer 

271 @example(...) 

272 @example(...).via("regression test for issue #42") 

273 

274 # The `hy-` prefix is reserved for automated tooling 

275 @example(...).via("hy-failing") 

276 @example(...).via("hy-coverage") 

277 @example(...).via("hy-target-$label") 

278 def test(x): 

279 pass 

280 

281 Note that this "method chaining" syntax requires Python 3.9 or later, for 

282 :pep:`614` relaxing grammar restrictions on decorators. If you need to 

283 support older versions of Python, you can use an identity function: 

284 

285 .. code-block:: python 

286 

287 def identity(x): 

288 return x 

289 

290 

291 @identity(example(...).via("label")) 

292 def test(x): 

293 pass 

294 

295 """ 

296 if not isinstance(whence, str): 

297 raise InvalidArgument(".via() must be passed a string") 

298 # This is deliberately a no-op at runtime; the tools operate on source code. 

299 return self 

300 

301 

302def seed(seed: Hashable) -> Callable[[TestFunc], TestFunc]: 

303 """seed: Start the test execution from a specific seed. 

304 

305 May be any hashable object. No exact meaning for seed is provided 

306 other than that for a fixed seed value Hypothesis will try the same 

307 actions (insofar as it can given external sources of non- 

308 determinism. e.g. timing and hash randomization). 

309 

310 Overrides the derandomize setting, which is designed to enable 

311 deterministic builds rather than reproducing observed failures. 

312 

313 """ 

314 

315 def accept(test): 

316 test._hypothesis_internal_use_seed = seed 

317 current_settings = getattr(test, "_hypothesis_internal_use_settings", None) 

318 test._hypothesis_internal_use_settings = Settings( 

319 current_settings, database=None 

320 ) 

321 return test 

322 

323 return accept 

324 

325 

326def reproduce_failure(version: str, blob: bytes) -> Callable[[TestFunc], TestFunc]: 

327 """Run the example that corresponds to this data blob in order to reproduce 

328 a failure. 

329 

330 A test with this decorator *always* runs only one example and always fails. 

331 If the provided example does not cause a failure, or is in some way invalid 

332 for this test, then this will fail with a DidNotReproduce error. 

333 

334 This decorator is not intended to be a permanent addition to your test 

335 suite. It's simply some code you can add to ease reproduction of a problem 

336 in the event that you don't have access to the test database. Because of 

337 this, *no* compatibility guarantees are made between different versions of 

338 Hypothesis - its API may change arbitrarily from version to version. 

339 """ 

340 

341 def accept(test): 

342 test._hypothesis_internal_use_reproduce_failure = (version, blob) 

343 return test 

344 

345 return accept 

346 

347 

348def encode_failure(buffer): 

349 buffer = bytes(buffer) 

350 compressed = zlib.compress(buffer) 

351 if len(compressed) < len(buffer): 

352 buffer = b"\1" + compressed 

353 else: 

354 buffer = b"\0" + buffer 

355 return base64.b64encode(buffer) 

356 

357 

358def decode_failure(blob): 

359 try: 

360 buffer = base64.b64decode(blob) 

361 except Exception: 

362 raise InvalidArgument(f"Invalid base64 encoded string: {blob!r}") from None 

363 prefix = buffer[:1] 

364 if prefix == b"\0": 

365 return buffer[1:] 

366 elif prefix == b"\1": 

367 try: 

368 return zlib.decompress(buffer[1:]) 

369 except zlib.error as err: 

370 raise InvalidArgument( 

371 f"Invalid zlib compression for blob {blob!r}" 

372 ) from err 

373 else: 

374 raise InvalidArgument( 

375 f"Could not decode blob {blob!r}: Invalid start byte {prefix!r}" 

376 ) 

377 

378 

379def _invalid(message, *, exc=InvalidArgument, test, given_kwargs): 

380 @impersonate(test) 

381 def wrapped_test(*arguments, **kwargs): # pragma: no cover # coverage limitation 

382 raise exc(message) 

383 

384 wrapped_test.is_hypothesis_test = True 

385 wrapped_test.hypothesis = HypothesisHandle( 

386 inner_test=test, 

387 get_fuzz_target=wrapped_test, 

388 given_kwargs=given_kwargs, 

389 ) 

390 return wrapped_test 

391 

392 

393def is_invalid_test(test, original_sig, given_arguments, given_kwargs): 

394 """Check the arguments to ``@given`` for basic usage constraints. 

395 

396 Most errors are not raised immediately; instead we return a dummy test 

397 function that will raise the appropriate error if it is actually called. 

398 When the user runs a subset of tests (e.g via ``pytest -k``), errors will 

399 only be reported for tests that actually ran. 

400 """ 

401 invalid = partial(_invalid, test=test, given_kwargs=given_kwargs) 

402 

403 if not (given_arguments or given_kwargs): 

404 return invalid("given must be called with at least one argument") 

405 

406 params = list(original_sig.parameters.values()) 

407 pos_params = [p for p in params if p.kind is p.POSITIONAL_OR_KEYWORD] 

408 kwonly_params = [p for p in params if p.kind is p.KEYWORD_ONLY] 

409 if given_arguments and params != pos_params: 

410 return invalid( 

411 "positional arguments to @given are not supported with varargs, " 

412 "varkeywords, positional-only, or keyword-only arguments" 

413 ) 

414 

415 if len(given_arguments) > len(pos_params): 

416 return invalid( 

417 f"Too many positional arguments for {test.__name__}() were passed to " 

418 f"@given - expected at most {len(pos_params)} " 

419 f"arguments, but got {len(given_arguments)} {given_arguments!r}" 

420 ) 

421 

422 if ... in given_arguments: 

423 return invalid( 

424 "... was passed as a positional argument to @given, but may only be " 

425 "passed as a keyword argument or as the sole argument of @given" 

426 ) 

427 

428 if given_arguments and given_kwargs: 

429 return invalid("cannot mix positional and keyword arguments to @given") 

430 extra_kwargs = [ 

431 k for k in given_kwargs if k not in {p.name for p in pos_params + kwonly_params} 

432 ] 

433 if extra_kwargs and (params == [] or params[-1].kind is not params[-1].VAR_KEYWORD): 

434 arg = extra_kwargs[0] 

435 return invalid( 

436 f"{test.__name__}() got an unexpected keyword argument {arg!r}, " 

437 f"from `{arg}={given_kwargs[arg]!r}` in @given" 

438 ) 

439 if any(p.default is not p.empty for p in params): 

440 return invalid("Cannot apply @given to a function with defaults.") 

441 

442 # This case would raise Unsatisfiable *anyway*, but by detecting it here we can 

443 # provide a much more helpful error message for people e.g. using the Ghostwriter. 

444 empty = [ 

445 f"{s!r} (arg {idx})" for idx, s in enumerate(given_arguments) if s is NOTHING 

446 ] + [f"{name}={s!r}" for name, s in given_kwargs.items() if s is NOTHING] 

447 if empty: 

448 strats = "strategies" if len(empty) > 1 else "strategy" 

449 return invalid( 

450 f"Cannot generate examples from empty {strats}: " + ", ".join(empty), 

451 exc=Unsatisfiable, 

452 ) 

453 

454 

455def execute_explicit_examples(state, wrapped_test, arguments, kwargs, original_sig): 

456 assert isinstance(state, StateForActualGivenExecution) 

457 posargs = [ 

458 p.name 

459 for p in original_sig.parameters.values() 

460 if p.kind is p.POSITIONAL_OR_KEYWORD 

461 ] 

462 

463 for example in reversed(getattr(wrapped_test, "hypothesis_explicit_examples", ())): 

464 assert isinstance(example, Example) 

465 # All of this validation is to check that @example() got "the same" arguments 

466 # as @given, i.e. corresponding to the same parameters, even though they might 

467 # be any mixture of positional and keyword arguments. 

468 if example.args: 

469 assert not example.kwargs 

470 if any( 

471 p.kind is p.POSITIONAL_ONLY for p in original_sig.parameters.values() 

472 ): 

473 raise InvalidArgument( 

474 "Cannot pass positional arguments to @example() when decorating " 

475 "a test function which has positional-only parameters." 

476 ) 

477 if len(example.args) > len(posargs): 

478 raise InvalidArgument( 

479 "example has too many arguments for test. Expected at most " 

480 f"{len(posargs)} but got {len(example.args)}" 

481 ) 

482 example_kwargs = dict(zip(posargs[-len(example.args) :], example.args)) 

483 else: 

484 example_kwargs = dict(example.kwargs) 

485 given_kws = ", ".join( 

486 repr(k) for k in sorted(wrapped_test.hypothesis._given_kwargs) 

487 ) 

488 example_kws = ", ".join(repr(k) for k in sorted(example_kwargs)) 

489 if given_kws != example_kws: 

490 raise InvalidArgument( 

491 f"Inconsistent args: @given() got strategies for {given_kws}, " 

492 f"but @example() got arguments for {example_kws}" 

493 ) from None 

494 

495 # This is certainly true because the example_kwargs exactly match the params 

496 # reserved by @given(), which are then remove from the function signature. 

497 assert set(example_kwargs).isdisjoint(kwargs) 

498 example_kwargs.update(kwargs) 

499 

500 if Phase.explicit not in state.settings.phases: 

501 continue 

502 

503 with local_settings(state.settings): 

504 fragments_reported = [] 

505 empty_data = ConjectureData.for_buffer(b"") 

506 try: 

507 bits = ", ".join(nicerepr(x) for x in arguments) + ", ".join( 

508 f"{k}={nicerepr(v)}" for k, v in example_kwargs.items() 

509 ) 

510 execute_example = partial( 

511 state.execute_once, 

512 empty_data, 

513 is_final=True, 

514 print_example=True, 

515 example_kwargs=example_kwargs, 

516 ) 

517 with with_reporter(fragments_reported.append): 

518 if example.raises is None: 

519 execute_example() 

520 else: 

521 # @example(...).xfail(...) 

522 

523 try: 

524 execute_example() 

525 except failure_exceptions_to_catch() as err: 

526 if not isinstance(err, example.raises): 

527 raise 

528 # Save a string form of this example; we'll warn if it's 

529 # ever generated by the strategy (which can't be xfailed) 

530 state.xfail_example_reprs.add( 

531 repr_call(state.test, arguments, example_kwargs) 

532 ) 

533 except example.raises as err: 

534 # We'd usually check this as early as possible, but it's 

535 # possible for failure_exceptions_to_catch() to grow when 

536 # e.g. pytest is imported between import- and test-time. 

537 raise InvalidArgument( 

538 f"@example({bits}) raised an expected {err!r}, " 

539 "but Hypothesis does not treat this as a test failure" 

540 ) from err 

541 else: 

542 # Unexpectedly passing; always raise an error in this case. 

543 reason = f" because {example.reason}" * bool(example.reason) 

544 if example.raises is BaseException: 

545 name = "exception" # special-case no raises= arg 

546 elif not isinstance(example.raises, tuple): 

547 name = example.raises.__name__ 

548 elif len(example.raises) == 1: 

549 name = example.raises[0].__name__ 

550 else: 

551 name = ( 

552 ", ".join(ex.__name__ for ex in example.raises[:-1]) 

553 + f", or {example.raises[-1].__name__}" 

554 ) 

555 vowel = name.upper()[0] in "AEIOU" 

556 raise AssertionError( 

557 f"Expected a{'n' * vowel} {name} from @example({bits})" 

558 f"{reason}, but no exception was raised." 

559 ) 

560 except UnsatisfiedAssumption: 

561 # Odd though it seems, we deliberately support explicit examples that 

562 # are then rejected by a call to `assume()`. As well as iterative 

563 # development, this is rather useful to replay Hypothesis' part of 

564 # a saved failure when other arguments are supplied by e.g. pytest. 

565 # See https://github.com/HypothesisWorks/hypothesis/issues/2125 

566 with contextlib.suppress(StopTest): 

567 empty_data.conclude_test(Status.INVALID) 

568 except BaseException as err: 

569 # In order to support reporting of multiple failing examples, we yield 

570 # each of the (report text, error) pairs we find back to the top-level 

571 # runner. This also ensures that user-facing stack traces have as few 

572 # frames of Hypothesis internals as possible. 

573 err = err.with_traceback(get_trimmed_traceback()) 

574 

575 # One user error - whether misunderstanding or typo - we've seen a few 

576 # times is to pass strategies to @example() where values are expected. 

577 # Checking is easy, and false-positives not much of a problem, so: 

578 if isinstance(err, failure_exceptions_to_catch()) and any( 

579 isinstance(arg, SearchStrategy) 

580 for arg in example.args + tuple(example.kwargs.values()) 

581 ): 

582 new = HypothesisWarning( 

583 "The @example() decorator expects to be passed values, but " 

584 "you passed strategies instead. See https://hypothesis." 

585 "readthedocs.io/en/latest/reproducing.html for details." 

586 ) 

587 new.__cause__ = err 

588 err = new 

589 

590 with contextlib.suppress(StopTest): 

591 empty_data.conclude_test(Status.INVALID) 

592 yield (fragments_reported, err) 

593 if ( 

594 state.settings.report_multiple_bugs 

595 and pytest_shows_exceptiongroups 

596 and isinstance(err, failure_exceptions_to_catch()) 

597 and not isinstance(err, skip_exceptions_to_reraise()) 

598 ): 

599 continue 

600 break 

601 finally: 

602 if fragments_reported: 

603 assert fragments_reported[0].startswith("Falsifying example") 

604 fragments_reported[0] = fragments_reported[0].replace( 

605 "Falsifying example", "Falsifying explicit example", 1 

606 ) 

607 

608 tc = make_testcase( 

609 start_timestamp=state._start_timestamp, 

610 test_name_or_nodeid=state.test_identifier, 

611 data=empty_data, 

612 how_generated="explicit example", 

613 string_repr=state._string_repr, 

614 timing=state._timing_features, 

615 ) 

616 deliver_json_blob(tc) 

617 

618 if fragments_reported: 

619 verbose_report(fragments_reported[0].replace("Falsifying", "Trying", 1)) 

620 for f in fragments_reported[1:]: 

621 verbose_report(f) 

622 

623 

624def get_random_for_wrapped_test(test, wrapped_test): 

625 settings = wrapped_test._hypothesis_internal_use_settings 

626 wrapped_test._hypothesis_internal_use_generated_seed = None 

627 

628 if wrapped_test._hypothesis_internal_use_seed is not None: 

629 return Random(wrapped_test._hypothesis_internal_use_seed) 

630 elif settings.derandomize: 

631 return Random(int_from_bytes(function_digest(test))) 

632 elif global_force_seed is not None: 

633 return Random(global_force_seed) 

634 else: 

635 global _hypothesis_global_random 

636 if _hypothesis_global_random is None: # pragma: no cover 

637 _hypothesis_global_random = Random() 

638 seed = _hypothesis_global_random.getrandbits(128) 

639 wrapped_test._hypothesis_internal_use_generated_seed = seed 

640 return Random(seed) 

641 

642 

643@attr.s 

644class Stuff: 

645 selfy: Any = attr.ib(default=None) 

646 args: tuple = attr.ib(factory=tuple) 

647 kwargs: dict = attr.ib(factory=dict) 

648 given_kwargs: dict = attr.ib(factory=dict) 

649 

650 

651def process_arguments_to_given(wrapped_test, arguments, kwargs, given_kwargs, params): 

652 selfy = None 

653 arguments, kwargs = convert_positional_arguments(wrapped_test, arguments, kwargs) 

654 

655 # If the test function is a method of some kind, the bound object 

656 # will be the first named argument if there are any, otherwise the 

657 # first vararg (if any). 

658 posargs = [p.name for p in params.values() if p.kind is p.POSITIONAL_OR_KEYWORD] 

659 if posargs: 

660 selfy = kwargs.get(posargs[0]) 

661 elif arguments: 

662 selfy = arguments[0] 

663 

664 # Ensure that we don't mistake mocks for self here. 

665 # This can cause the mock to be used as the test runner. 

666 if is_mock(selfy): 

667 selfy = None 

668 

669 arguments = tuple(arguments) 

670 

671 with ensure_free_stackframes(): 

672 for k, s in given_kwargs.items(): 

673 check_strategy(s, name=k) 

674 s.validate() 

675 

676 stuff = Stuff(selfy=selfy, args=arguments, kwargs=kwargs, given_kwargs=given_kwargs) 

677 

678 return arguments, kwargs, stuff 

679 

680 

681def skip_exceptions_to_reraise(): 

682 """Return a tuple of exceptions meaning 'skip this test', to re-raise. 

683 

684 This is intended to cover most common test runners; if you would 

685 like another to be added please open an issue or pull request adding 

686 it to this function and to tests/cover/test_lazy_import.py 

687 """ 

688 # This is a set because nose may simply re-export unittest.SkipTest 

689 exceptions = set() 

690 # We use this sys.modules trick to avoid importing libraries - 

691 # you can't be an instance of a type from an unimported module! 

692 # This is fast enough that we don't need to cache the result, 

693 # and more importantly it avoids possible side-effects :-) 

694 if "unittest" in sys.modules: 

695 exceptions.add(sys.modules["unittest"].SkipTest) 

696 if "unittest2" in sys.modules: 

697 exceptions.add(sys.modules["unittest2"].SkipTest) 

698 if "nose" in sys.modules: 

699 exceptions.add(sys.modules["nose"].SkipTest) 

700 if "_pytest" in sys.modules: 

701 exceptions.add(sys.modules["_pytest"].outcomes.Skipped) 

702 return tuple(sorted(exceptions, key=str)) 

703 

704 

705def failure_exceptions_to_catch(): 

706 """Return a tuple of exceptions meaning 'this test has failed', to catch. 

707 

708 This is intended to cover most common test runners; if you would 

709 like another to be added please open an issue or pull request. 

710 """ 

711 # While SystemExit and GeneratorExit are instances of BaseException, we also 

712 # expect them to be deterministic - unlike KeyboardInterrupt - and so we treat 

713 # them as standard exceptions, check for flakiness, etc. 

714 # See https://github.com/HypothesisWorks/hypothesis/issues/2223 for details. 

715 exceptions = [Exception, SystemExit, GeneratorExit] 

716 if "_pytest" in sys.modules: 

717 exceptions.append(sys.modules["_pytest"].outcomes.Failed) 

718 return tuple(exceptions) 

719 

720 

721def new_given_signature(original_sig, given_kwargs): 

722 """Make an updated signature for the wrapped test.""" 

723 return original_sig.replace( 

724 parameters=[ 

725 p 

726 for p in original_sig.parameters.values() 

727 if not ( 

728 p.name in given_kwargs 

729 and p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY) 

730 ) 

731 ], 

732 return_annotation=None, 

733 ) 

734 

735 

736def default_executor(data, function): 

737 return function(data) 

738 

739 

740def get_executor(runner): 

741 try: 

742 execute_example = runner.execute_example 

743 except AttributeError: 

744 pass 

745 else: 

746 return lambda data, function: execute_example(partial(function, data)) 

747 

748 if hasattr(runner, "setup_example") or hasattr(runner, "teardown_example"): 

749 setup = getattr(runner, "setup_example", None) or (lambda: None) 

750 teardown = getattr(runner, "teardown_example", None) or (lambda ex: None) 

751 

752 def execute(data, function): 

753 token = None 

754 try: 

755 token = setup() 

756 return function(data) 

757 finally: 

758 teardown(token) 

759 

760 return execute 

761 

762 return default_executor 

763 

764 

765class StateForActualGivenExecution: 

766 def __init__(self, stuff, test, settings, random, wrapped_test): 

767 self.test_runner = get_executor(stuff.selfy) 

768 self.stuff = stuff 

769 self.settings = settings 

770 self.last_exception = None 

771 self.falsifying_examples = () 

772 self.random = random 

773 self.ever_executed = False 

774 

775 self.is_find = getattr(wrapped_test, "_hypothesis_internal_is_find", False) 

776 self.wrapped_test = wrapped_test 

777 self.xfail_example_reprs = set() 

778 

779 self.test = test 

780 

781 self.print_given_args = getattr( 

782 wrapped_test, "_hypothesis_internal_print_given_args", True 

783 ) 

784 

785 self.files_to_propagate = set() 

786 self.failed_normally = False 

787 self.failed_due_to_deadline = False 

788 

789 self.explain_traces = defaultdict(set) 

790 self._start_timestamp = time.time() 

791 self._string_repr = "" 

792 self._timing_features = {} 

793 

794 @property 

795 def test_identifier(self): 

796 return getattr( 

797 current_pytest_item.value, "nodeid", None 

798 ) or get_pretty_function_description(self.wrapped_test) 

799 

800 def execute_once( 

801 self, 

802 data, 

803 *, 

804 print_example=False, 

805 is_final=False, 

806 expected_failure=None, 

807 example_kwargs=None, 

808 ): 

809 """Run the test function once, using ``data`` as input. 

810 

811 If the test raises an exception, it will propagate through to the 

812 caller of this method. Depending on its type, this could represent 

813 an ordinary test failure, or a fatal error, or a control exception. 

814 

815 If this method returns normally, the test might have passed, or 

816 it might have placed ``data`` in an unsuccessful state and then 

817 swallowed the corresponding control exception. 

818 """ 

819 

820 self.ever_executed = True 

821 data.is_find = self.is_find 

822 

823 self._string_repr = "" 

824 text_repr = None 

825 if self.settings.deadline is None and not TESTCASE_CALLBACKS: 

826 

827 @proxies(self.test) 

828 def test(*args, **kwargs): 

829 with ensure_free_stackframes(): 

830 return self.test(*args, **kwargs) 

831 

832 else: 

833 

834 @proxies(self.test) 

835 def test(*args, **kwargs): 

836 arg_drawtime = math.fsum(data.draw_times.values()) 

837 arg_stateful = math.fsum(data._stateful_run_times.values()) 

838 arg_gctime = gc_cumulative_time() 

839 start = time.perf_counter() 

840 try: 

841 with ensure_free_stackframes(): 

842 result = self.test(*args, **kwargs) 

843 finally: 

844 finish = time.perf_counter() 

845 in_drawtime = math.fsum(data.draw_times.values()) - arg_drawtime 

846 in_stateful = ( 

847 math.fsum(data._stateful_run_times.values()) - arg_stateful 

848 ) 

849 in_gctime = gc_cumulative_time() - arg_gctime 

850 runtime = finish - start - in_drawtime - in_stateful - in_gctime 

851 self._timing_features = { 

852 "execute:test": runtime, 

853 "overall:gc": in_gctime, 

854 **data.draw_times, 

855 **data._stateful_run_times, 

856 } 

857 

858 if (current_deadline := self.settings.deadline) is not None: 

859 if not is_final: 

860 current_deadline = (current_deadline // 4) * 5 

861 if runtime >= current_deadline.total_seconds(): 

862 raise DeadlineExceeded( 

863 datetime.timedelta(seconds=runtime), self.settings.deadline 

864 ) 

865 return result 

866 

867 def run(data): 

868 # Set up dynamic context needed by a single test run. 

869 if self.stuff.selfy is not None: 

870 data.hypothesis_runner = self.stuff.selfy 

871 # Generate all arguments to the test function. 

872 args = self.stuff.args 

873 kwargs = dict(self.stuff.kwargs) 

874 if example_kwargs is None: 

875 kw, argslices = context.prep_args_kwargs_from_strategies( 

876 self.stuff.given_kwargs 

877 ) 

878 else: 

879 kw = example_kwargs 

880 argslices = {} 

881 kwargs.update(kw) 

882 if expected_failure is not None: 

883 nonlocal text_repr 

884 text_repr = repr_call(test, args, kwargs) 

885 if text_repr in self.xfail_example_reprs: 

886 warnings.warn( 

887 f"We generated {text_repr}, which seems identical " 

888 "to one of your `@example(...).xfail()` cases. " 

889 "Revise the strategy to avoid this overlap?", 

890 HypothesisWarning, 

891 # Checked in test_generating_xfailed_examples_warns! 

892 stacklevel=6, 

893 ) 

894 

895 if print_example or current_verbosity() >= Verbosity.verbose: 

896 printer = RepresentationPrinter(context=context) 

897 if print_example: 

898 printer.text("Falsifying example:") 

899 else: 

900 printer.text("Trying example:") 

901 

902 if self.print_given_args: 

903 printer.text(" ") 

904 printer.repr_call( 

905 test.__name__, 

906 args, 

907 kwargs, 

908 force_split=True, 

909 arg_slices=argslices, 

910 leading_comment=( 

911 "# " + context.data.slice_comments[(0, 0)] 

912 if (0, 0) in context.data.slice_comments 

913 else None 

914 ), 

915 ) 

916 report(printer.getvalue()) 

917 

918 if TESTCASE_CALLBACKS: 

919 printer = RepresentationPrinter(context=context) 

920 printer.repr_call( 

921 test.__name__, 

922 args, 

923 kwargs, 

924 force_split=True, 

925 arg_slices=argslices, 

926 leading_comment=( 

927 "# " + context.data.slice_comments[(0, 0)] 

928 if (0, 0) in context.data.slice_comments 

929 else None 

930 ), 

931 ) 

932 self._string_repr = printer.getvalue() 

933 data._observability_arguments = { 

934 **dict(enumerate(map(to_jsonable, args))), 

935 **{k: to_jsonable(v) for k, v in kwargs.items()}, 

936 } 

937 

938 try: 

939 return test(*args, **kwargs) 

940 except TypeError as e: 

941 # If we sampled from a sequence of strategies, AND failed with a 

942 # TypeError, *AND that exception mentions SearchStrategy*, add a note: 

943 if "SearchStrategy" in str(e) and hasattr( 

944 data, "_sampled_from_all_strategies_elements_message" 

945 ): 

946 msg, format_arg = data._sampled_from_all_strategies_elements_message 

947 add_note(e, msg.format(format_arg)) 

948 raise 

949 finally: 

950 if parts := getattr(data, "_stateful_repr_parts", None): 

951 self._string_repr = "\n".join(parts) 

952 

953 # self.test_runner can include the execute_example method, or setup/teardown 

954 # _example, so it's important to get the PRNG and build context in place first. 

955 with local_settings(self.settings): 

956 with deterministic_PRNG(): 

957 with BuildContext(data, is_final=is_final) as context: 

958 # providers may throw in per_case_context_fn, and we'd like 

959 # `result` to still be set in these cases. 

960 result = None 

961 with data.provider.per_test_case_context_manager(): 

962 # Run the test function once, via the executor hook. 

963 # In most cases this will delegate straight to `run(data)`. 

964 result = self.test_runner(data, run) 

965 

966 # If a failure was expected, it should have been raised already, so 

967 # instead raise an appropriate diagnostic error. 

968 if expected_failure is not None: 

969 exception, traceback = expected_failure 

970 if isinstance(exception, DeadlineExceeded) and ( 

971 runtime_secs := math.fsum( 

972 v 

973 for k, v in self._timing_features.items() 

974 if k.startswith("execute:") 

975 ) 

976 ): 

977 report( 

978 "Unreliable test timings! On an initial run, this " 

979 "test took %.2fms, which exceeded the deadline of " 

980 "%.2fms, but on a subsequent run it took %.2f ms, " 

981 "which did not. If you expect this sort of " 

982 "variability in your test timings, consider turning " 

983 "deadlines off for this test by setting deadline=None." 

984 % ( 

985 exception.runtime.total_seconds() * 1000, 

986 self.settings.deadline.total_seconds() * 1000, 

987 runtime_secs * 1000, 

988 ) 

989 ) 

990 else: 

991 report("Failed to reproduce exception. Expected: \n" + traceback) 

992 raise Flaky( 

993 f"Hypothesis {text_repr} produces unreliable results: " 

994 "Falsified on the first call but did not on a subsequent one" 

995 ) from exception 

996 return result 

997 

998 def _execute_once_for_engine(self, data: ConjectureData) -> None: 

999 """Wrapper around ``execute_once`` that intercepts test failure 

1000 exceptions and single-test control exceptions, and turns them into 

1001 appropriate method calls to `data` instead. 

1002 

1003 This allows the engine to assume that any exception other than 

1004 ``StopTest`` must be a fatal error, and should stop the entire engine. 

1005 """ 

1006 trace: Trace = set() 

1007 try: 

1008 # this is actually covered by our tests, but only on >= 3.12. 

1009 if ( 

1010 sys.version_info[:2] >= (3, 12) 

1011 and sys.monitoring.get_tool(MONITORING_TOOL_ID) is not None 

1012 ): # pragma: no cover 

1013 warnings.warn( 

1014 "avoiding tracing test function because tool id " 

1015 f"{MONITORING_TOOL_ID} is already taken by tool " 

1016 f"{sys.monitoring.get_tool(MONITORING_TOOL_ID)}.", 

1017 HypothesisWarning, 

1018 # I'm not sure computing a correct stacklevel is reasonable 

1019 # given the number of entry points here. 

1020 stacklevel=1, 

1021 ) 

1022 

1023 _can_trace = ( 

1024 (sys.version_info[:2] < (3, 12) and sys.gettrace() is None) 

1025 or ( 

1026 sys.version_info[:2] >= (3, 12) 

1027 and sys.monitoring.get_tool(MONITORING_TOOL_ID) is None 

1028 ) 

1029 ) and not PYPY 

1030 _trace_obs = TESTCASE_CALLBACKS and OBSERVABILITY_COLLECT_COVERAGE 

1031 _trace_failure = ( 

1032 self.failed_normally 

1033 and not self.failed_due_to_deadline 

1034 and {Phase.shrink, Phase.explain}.issubset(self.settings.phases) 

1035 ) 

1036 if _can_trace and (_trace_obs or _trace_failure): # pragma: no cover 

1037 # This is in fact covered by our *non-coverage* tests, but due to the 

1038 # settrace() contention *not* by our coverage tests. Ah well. 

1039 with Tracer() as tracer: 

1040 try: 

1041 result = self.execute_once(data) 

1042 if data.status == Status.VALID: 

1043 self.explain_traces[None].add(frozenset(tracer.branches)) 

1044 finally: 

1045 trace = tracer.branches 

1046 else: 

1047 result = self.execute_once(data) 

1048 if result is not None: 

1049 fail_health_check( 

1050 self.settings, 

1051 "Tests run under @given should return None, but " 

1052 f"{self.test.__name__} returned {result!r} instead.", 

1053 HealthCheck.return_value, 

1054 ) 

1055 except UnsatisfiedAssumption as e: 

1056 # An "assume" check failed, so instead we inform the engine that 

1057 # this test run was invalid. 

1058 data.mark_invalid(e.reason) 

1059 except StopTest: 

1060 # The engine knows how to handle this control exception, so it's 

1061 # OK to re-raise it. 

1062 raise 

1063 except ( 

1064 HypothesisDeprecationWarning, 

1065 FailedHealthCheck, 

1066 *skip_exceptions_to_reraise(), 

1067 ): 

1068 # These are fatal errors or control exceptions that should stop the 

1069 # engine, so we re-raise them. 

1070 raise 

1071 except failure_exceptions_to_catch() as e: 

1072 # If the error was raised by Hypothesis-internal code, re-raise it 

1073 # as a fatal error instead of treating it as a test failure. 

1074 escalate_hypothesis_internal_error() 

1075 

1076 if data.frozen: 

1077 # This can happen if an error occurred in a finally 

1078 # block somewhere, suppressing our original StopTest. 

1079 # We raise a new one here to resume normal operation. 

1080 raise StopTest(data.testcounter) from e 

1081 else: 

1082 # The test failed by raising an exception, so we inform the 

1083 # engine that this test run was interesting. This is the normal 

1084 # path for test runs that fail. 

1085 tb = get_trimmed_traceback() 

1086 info = data.extra_information 

1087 info._expected_traceback = format_exception(e, tb) # type: ignore 

1088 info._expected_exception = e # type: ignore 

1089 verbose_report(info._expected_traceback) # type: ignore 

1090 

1091 self.failed_normally = True 

1092 

1093 interesting_origin = InterestingOrigin.from_exception(e) 

1094 if trace: # pragma: no cover 

1095 # Trace collection is explicitly disabled under coverage. 

1096 self.explain_traces[interesting_origin].add(frozenset(trace)) 

1097 if interesting_origin[0] == DeadlineExceeded: 

1098 self.failed_due_to_deadline = True 

1099 self.explain_traces.clear() 

1100 data.mark_interesting(interesting_origin) # type: ignore # mypy bug? 

1101 finally: 

1102 # Conditional here so we can save some time constructing the payload; in 

1103 # other cases (without coverage) it's cheap enough to do that regardless. 

1104 if TESTCASE_CALLBACKS: 

1105 if runner := getattr(self, "_runner", None): 

1106 phase = runner._current_phase 

1107 elif self.failed_normally or self.failed_due_to_deadline: 

1108 phase = "shrink" 

1109 else: # pragma: no cover # in case of messing with internals 

1110 phase = "unknown" 

1111 backend_desc = f", using backend={self.settings.backend!r}" * ( 

1112 self.settings.backend != "hypothesis" 

1113 and not getattr(runner, "_switch_to_hypothesis_provider", False) 

1114 ) 

1115 tc = make_testcase( 

1116 start_timestamp=self._start_timestamp, 

1117 test_name_or_nodeid=self.test_identifier, 

1118 data=data, 

1119 how_generated=f"during {phase} phase{backend_desc}", 

1120 string_repr=self._string_repr, 

1121 arguments=data._observability_args, 

1122 timing=self._timing_features, 

1123 coverage=tractable_coverage_report(trace) or None, 

1124 phase=phase, 

1125 ) 

1126 deliver_json_blob(tc) 

1127 self._timing_features = {} 

1128 

1129 def run_engine(self): 

1130 """Run the test function many times, on database input and generated 

1131 input, using the Conjecture engine. 

1132 """ 

1133 # Tell pytest to omit the body of this function from tracebacks 

1134 __tracebackhide__ = True 

1135 try: 

1136 database_key = self.wrapped_test._hypothesis_internal_database_key 

1137 except AttributeError: 

1138 if global_force_seed is None: 

1139 database_key = function_digest(self.test) 

1140 else: 

1141 database_key = None 

1142 

1143 runner = self._runner = ConjectureRunner( 

1144 self._execute_once_for_engine, 

1145 settings=self.settings, 

1146 random=self.random, 

1147 database_key=database_key, 

1148 ) 

1149 # Use the Conjecture engine to run the test function many times 

1150 # on different inputs. 

1151 runner.run() 

1152 note_statistics(runner.statistics) 

1153 deliver_json_blob( 

1154 { 

1155 "type": "info", 

1156 "run_start": self._start_timestamp, 

1157 "property": self.test_identifier, 

1158 "title": "Hypothesis Statistics", 

1159 "content": describe_statistics(runner.statistics), 

1160 } 

1161 ) 

1162 

1163 if runner.call_count == 0: 

1164 return 

1165 if runner.interesting_examples: 

1166 self.falsifying_examples = sorted( 

1167 runner.interesting_examples.values(), 

1168 key=lambda d: sort_key(d.buffer), 

1169 reverse=True, 

1170 ) 

1171 else: 

1172 if runner.valid_examples == 0: 

1173 rep = get_pretty_function_description(self.test) 

1174 raise Unsatisfiable(f"Unable to satisfy assumptions of {rep}") 

1175 

1176 if not self.falsifying_examples: 

1177 return 

1178 elif not (self.settings.report_multiple_bugs and pytest_shows_exceptiongroups): 

1179 # Pretend that we only found one failure, by discarding the others. 

1180 del self.falsifying_examples[:-1] 

1181 

1182 # The engine found one or more failures, so we need to reproduce and 

1183 # report them. 

1184 

1185 errors_to_report = [] 

1186 

1187 report_lines = describe_targets(runner.best_observed_targets) 

1188 if report_lines: 

1189 report_lines.append("") 

1190 

1191 explanations = explanatory_lines(self.explain_traces, self.settings) 

1192 for falsifying_example in self.falsifying_examples: 

1193 info = falsifying_example.extra_information 

1194 fragments = [] 

1195 

1196 ran_example = runner.new_conjecture_data_for_buffer( 

1197 falsifying_example.buffer 

1198 ) 

1199 ran_example.slice_comments = falsifying_example.slice_comments 

1200 tb = None 

1201 origin = None 

1202 assert info._expected_exception is not None 

1203 try: 

1204 with with_reporter(fragments.append): 

1205 self.execute_once( 

1206 ran_example, 

1207 print_example=not self.is_find, 

1208 is_final=True, 

1209 expected_failure=( 

1210 info._expected_exception, 

1211 info._expected_traceback, 

1212 ), 

1213 ) 

1214 except (UnsatisfiedAssumption, StopTest) as e: 

1215 err = Flaky( 

1216 "Unreliable assumption: An example which satisfied " 

1217 "assumptions on the first run now fails it.", 

1218 ) 

1219 err.__cause__ = err.__context__ = e 

1220 errors_to_report.append((fragments, err)) 

1221 except BaseException as e: 

1222 # If we have anything for explain-mode, this is the time to report. 

1223 fragments.extend(explanations[falsifying_example.interesting_origin]) 

1224 errors_to_report.append( 

1225 (fragments, e.with_traceback(get_trimmed_traceback())) 

1226 ) 

1227 tb = format_exception(e, get_trimmed_traceback(e)) 

1228 origin = InterestingOrigin.from_exception(e) 

1229 else: 

1230 # execute_once() will always raise either the expected error, or Flaky. 

1231 raise NotImplementedError("This should be unreachable") 

1232 finally: 

1233 # log our observability line for the final failing example 

1234 tc = { 

1235 "type": "test_case", 

1236 "run_start": self._start_timestamp, 

1237 "property": self.test_identifier, 

1238 "status": "passed" if sys.exc_info()[0] else "failed", 

1239 "status_reason": str(origin or "unexpected/flaky pass"), 

1240 "representation": self._string_repr, 

1241 "arguments": ran_example._observability_args, 

1242 "how_generated": "minimal failing example", 

1243 "features": { 

1244 **{ 

1245 f"target:{k}".strip(":"): v 

1246 for k, v in ran_example.target_observations.items() 

1247 }, 

1248 **ran_example.events, 

1249 }, 

1250 "timing": self._timing_features, 

1251 "coverage": None, # Not recorded when we're replaying the MFE 

1252 "metadata": { 

1253 "traceback": tb, 

1254 "predicates": ran_example._observability_predicates, 

1255 **_system_metadata(), 

1256 }, 

1257 } 

1258 deliver_json_blob(tc) 

1259 # Whether or not replay actually raised the exception again, we want 

1260 # to print the reproduce_failure decorator for the failing example. 

1261 if self.settings.print_blob: 

1262 fragments.append( 

1263 "\nYou can reproduce this example by temporarily adding " 

1264 "@reproduce_failure(%r, %r) as a decorator on your test case" 

1265 % (__version__, encode_failure(falsifying_example.buffer)) 

1266 ) 

1267 # Mostly useful for ``find`` and ensuring that objects that 

1268 # hold on to a reference to ``data`` know that it's now been 

1269 # finished and they can't draw more data from it. 

1270 ran_example.freeze() # pragma: no branch 

1271 # No branch is possible here because we never have an active exception. 

1272 _raise_to_user(errors_to_report, self.settings, report_lines) 

1273 

1274 

1275def _raise_to_user(errors_to_report, settings, target_lines, trailer=""): 

1276 """Helper function for attaching notes and grouping multiple errors.""" 

1277 failing_prefix = "Falsifying example: " 

1278 ls = [] 

1279 for fragments, err in errors_to_report: 

1280 for note in fragments: 

1281 add_note(err, note) 

1282 if note.startswith(failing_prefix): 

1283 ls.append(note[len(failing_prefix) :]) 

1284 if current_pytest_item.value: 

1285 current_pytest_item.value._hypothesis_failing_examples = ls 

1286 

1287 if len(errors_to_report) == 1: 

1288 _, the_error_hypothesis_found = errors_to_report[0] 

1289 else: 

1290 assert errors_to_report 

1291 the_error_hypothesis_found = BaseExceptionGroup( 

1292 f"Hypothesis found {len(errors_to_report)} distinct failures{trailer}.", 

1293 [e for _, e in errors_to_report], 

1294 ) 

1295 

1296 if settings.verbosity >= Verbosity.normal: 

1297 for line in target_lines: 

1298 add_note(the_error_hypothesis_found, line) 

1299 raise the_error_hypothesis_found 

1300 

1301 

1302@contextlib.contextmanager 

1303def fake_subTest(self, msg=None, **__): 

1304 """Monkeypatch for `unittest.TestCase.subTest` during `@given`. 

1305 

1306 If we don't patch this out, each failing example is reported as a 

1307 separate failing test by the unittest test runner, which is 

1308 obviously incorrect. We therefore replace it for the duration with 

1309 this version. 

1310 """ 

1311 warnings.warn( 

1312 "subTest per-example reporting interacts badly with Hypothesis " 

1313 "trying hundreds of examples, so we disable it for the duration of " 

1314 "any test that uses `@given`.", 

1315 HypothesisWarning, 

1316 stacklevel=2, 

1317 ) 

1318 yield 

1319 

1320 

1321@attr.s() 

1322class HypothesisHandle: 

1323 """This object is provided as the .hypothesis attribute on @given tests. 

1324 

1325 Downstream users can reassign its attributes to insert custom logic into 

1326 the execution of each case, for example by converting an async into a 

1327 sync function. 

1328 

1329 This must be an attribute of an attribute, because reassignment of a 

1330 first-level attribute would not be visible to Hypothesis if the function 

1331 had been decorated before the assignment. 

1332 

1333 See https://github.com/HypothesisWorks/hypothesis/issues/1257 for more 

1334 information. 

1335 """ 

1336 

1337 inner_test = attr.ib() 

1338 _get_fuzz_target = attr.ib() 

1339 _given_kwargs = attr.ib() 

1340 

1341 @property 

1342 def fuzz_one_input( 

1343 self, 

1344 ) -> Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]]: 

1345 """Run the test as a fuzz target, driven with the `buffer` of bytes. 

1346 

1347 Returns None if buffer invalid for the strategy, canonical pruned 

1348 bytes if the buffer was valid, and leaves raised exceptions alone. 

1349 """ 

1350 # Note: most users, if they care about fuzzer performance, will access the 

1351 # property and assign it to a local variable to move the attribute lookup 

1352 # outside their fuzzing loop / before the fork point. We cache it anyway, 

1353 # so that naive or unusual use-cases get the best possible performance too. 

1354 try: 

1355 return self.__cached_target # type: ignore 

1356 except AttributeError: 

1357 self.__cached_target = self._get_fuzz_target() 

1358 return self.__cached_target 

1359 

1360 

1361@overload 

1362def given( 

1363 _: EllipsisType, / 

1364) -> Callable[ 

1365 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[[], None] 

1366]: # pragma: no cover 

1367 ... 

1368 

1369 

1370@overload 

1371def given( 

1372 *_given_arguments: SearchStrategy[Any], 

1373) -> Callable[ 

1374 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None] 

1375]: # pragma: no cover 

1376 ... 

1377 

1378 

1379@overload 

1380def given( 

1381 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType], 

1382) -> Callable[ 

1383 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None] 

1384]: # pragma: no cover 

1385 ... 

1386 

1387 

1388def given( 

1389 *_given_arguments: Union[SearchStrategy[Any], EllipsisType], 

1390 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType], 

1391) -> Callable[ 

1392 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None] 

1393]: 

1394 """A decorator for turning a test function that accepts arguments into a 

1395 randomized test. 

1396 

1397 This is the main entry point to Hypothesis. 

1398 """ 

1399 

1400 def run_test_as_given(test): 

1401 if inspect.isclass(test): 

1402 # Provide a meaningful error to users, instead of exceptions from 

1403 # internals that assume we're dealing with a function. 

1404 raise InvalidArgument("@given cannot be applied to a class.") 

1405 given_arguments = tuple(_given_arguments) 

1406 given_kwargs = dict(_given_kwargs) 

1407 

1408 original_sig = get_signature(test) 

1409 if given_arguments == (Ellipsis,) and not given_kwargs: 

1410 # user indicated that they want to infer all arguments 

1411 given_kwargs = { 

1412 p.name: Ellipsis 

1413 for p in original_sig.parameters.values() 

1414 if p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY) 

1415 } 

1416 given_arguments = () 

1417 

1418 check_invalid = is_invalid_test( 

1419 test, original_sig, given_arguments, given_kwargs 

1420 ) 

1421 

1422 # If the argument check found problems, return a dummy test function 

1423 # that will raise an error if it is actually called. 

1424 if check_invalid is not None: 

1425 return check_invalid 

1426 

1427 # Because the argument check succeeded, we can convert @given's 

1428 # positional arguments into keyword arguments for simplicity. 

1429 if given_arguments: 

1430 assert not given_kwargs 

1431 posargs = [ 

1432 p.name 

1433 for p in original_sig.parameters.values() 

1434 if p.kind is p.POSITIONAL_OR_KEYWORD 

1435 ] 

1436 given_kwargs = dict(list(zip(posargs[::-1], given_arguments[::-1]))[::-1]) 

1437 # These have been converted, so delete them to prevent accidental use. 

1438 del given_arguments 

1439 

1440 new_signature = new_given_signature(original_sig, given_kwargs) 

1441 

1442 # Use type information to convert "infer" arguments into appropriate strategies. 

1443 if ... in given_kwargs.values(): 

1444 hints = get_type_hints(test) 

1445 for name in [name for name, value in given_kwargs.items() if value is ...]: 

1446 if name not in hints: 

1447 return _invalid( 

1448 f"passed {name}=... for {test.__name__}, but {name} has " 

1449 "no type annotation", 

1450 test=test, 

1451 given_kwargs=given_kwargs, 

1452 ) 

1453 given_kwargs[name] = st.from_type(hints[name]) 

1454 

1455 prev_self = Unset = object() 

1456 

1457 @impersonate(test) 

1458 @define_function_signature(test.__name__, test.__doc__, new_signature) 

1459 def wrapped_test(*arguments, **kwargs): 

1460 # Tell pytest to omit the body of this function from tracebacks 

1461 __tracebackhide__ = True 

1462 

1463 test = wrapped_test.hypothesis.inner_test 

1464 

1465 if getattr(test, "is_hypothesis_test", False): 

1466 raise InvalidArgument( 

1467 f"You have applied @given to the test {test.__name__} more than " 

1468 "once, which wraps the test several times and is extremely slow. " 

1469 "A similar effect can be gained by combining the arguments " 

1470 "of the two calls to given. For example, instead of " 

1471 "@given(booleans()) @given(integers()), you could write " 

1472 "@given(booleans(), integers())" 

1473 ) 

1474 

1475 settings = wrapped_test._hypothesis_internal_use_settings 

1476 

1477 random = get_random_for_wrapped_test(test, wrapped_test) 

1478 

1479 arguments, kwargs, stuff = process_arguments_to_given( 

1480 wrapped_test, arguments, kwargs, given_kwargs, new_signature.parameters 

1481 ) 

1482 

1483 if ( 

1484 inspect.iscoroutinefunction(test) 

1485 and get_executor(stuff.selfy) is default_executor 

1486 ): 

1487 # See https://github.com/HypothesisWorks/hypothesis/issues/3054 

1488 # If our custom executor doesn't handle coroutines, or we return an 

1489 # awaitable from a non-async-def function, we just rely on the 

1490 # return_value health check. This catches most user errors though. 

1491 raise InvalidArgument( 

1492 "Hypothesis doesn't know how to run async test functions like " 

1493 f"{test.__name__}. You'll need to write a custom executor, " 

1494 "or use a library like pytest-asyncio or pytest-trio which can " 

1495 "handle the translation for you.\n See https://hypothesis." 

1496 "readthedocs.io/en/latest/details.html#custom-function-execution" 

1497 ) 

1498 

1499 runner = stuff.selfy 

1500 if isinstance(stuff.selfy, TestCase) and test.__name__ in dir(TestCase): 

1501 msg = ( 

1502 f"You have applied @given to the method {test.__name__}, which is " 

1503 "used by the unittest runner but is not itself a test." 

1504 " This is not useful in any way." 

1505 ) 

1506 fail_health_check(settings, msg, HealthCheck.not_a_test_method) 

1507 if bad_django_TestCase(runner): # pragma: no cover 

1508 # Covered by the Django tests, but not the pytest coverage task 

1509 raise InvalidArgument( 

1510 "You have applied @given to a method on " 

1511 f"{type(runner).__qualname__}, but this " 

1512 "class does not inherit from the supported versions in " 

1513 "`hypothesis.extra.django`. Use the Hypothesis variants " 

1514 "to ensure that each example is run in a separate " 

1515 "database transaction." 

1516 ) 

1517 if settings.database is not None: 

1518 nonlocal prev_self 

1519 # Check selfy really is self (not e.g. a mock) before we health-check 

1520 cur_self = ( 

1521 stuff.selfy 

1522 if getattr(type(stuff.selfy), test.__name__, None) is wrapped_test 

1523 else None 

1524 ) 

1525 if prev_self is Unset: 

1526 prev_self = cur_self 

1527 elif cur_self is not prev_self: 

1528 msg = ( 

1529 f"The method {test.__qualname__} was called from multiple " 

1530 "different executors. This may lead to flaky tests and " 

1531 "nonreproducible errors when replaying from database." 

1532 ) 

1533 fail_health_check(settings, msg, HealthCheck.differing_executors) 

1534 

1535 state = StateForActualGivenExecution( 

1536 stuff, test, settings, random, wrapped_test 

1537 ) 

1538 

1539 reproduce_failure = wrapped_test._hypothesis_internal_use_reproduce_failure 

1540 

1541 # If there was a @reproduce_failure decorator, use it to reproduce 

1542 # the error (or complain that we couldn't). Either way, this will 

1543 # always raise some kind of error. 

1544 if reproduce_failure is not None: 

1545 expected_version, failure = reproduce_failure 

1546 if expected_version != __version__: 

1547 raise InvalidArgument( 

1548 "Attempting to reproduce a failure from a different " 

1549 "version of Hypothesis. This failure is from %s, but " 

1550 "you are currently running %r. Please change your " 

1551 "Hypothesis version to a matching one." 

1552 % (expected_version, __version__) 

1553 ) 

1554 try: 

1555 state.execute_once( 

1556 ConjectureData.for_buffer(decode_failure(failure)), 

1557 print_example=True, 

1558 is_final=True, 

1559 ) 

1560 raise DidNotReproduce( 

1561 "Expected the test to raise an error, but it " 

1562 "completed successfully." 

1563 ) 

1564 except StopTest: 

1565 raise DidNotReproduce( 

1566 "The shape of the test data has changed in some way " 

1567 "from where this blob was defined. Are you sure " 

1568 "you're running the same test?" 

1569 ) from None 

1570 except UnsatisfiedAssumption: 

1571 raise DidNotReproduce( 

1572 "The test data failed to satisfy an assumption in the " 

1573 "test. Have you added it since this blob was generated?" 

1574 ) from None 

1575 

1576 # There was no @reproduce_failure, so start by running any explicit 

1577 # examples from @example decorators. 

1578 errors = list( 

1579 execute_explicit_examples( 

1580 state, wrapped_test, arguments, kwargs, original_sig 

1581 ) 

1582 ) 

1583 if errors: 

1584 # If we're not going to report multiple bugs, we would have 

1585 # stopped running explicit examples at the first failure. 

1586 assert len(errors) == 1 or state.settings.report_multiple_bugs 

1587 

1588 # If an explicit example raised a 'skip' exception, ensure it's never 

1589 # wrapped up in an exception group. Because we break out of the loop 

1590 # immediately on finding a skip, if present it's always the last error. 

1591 if isinstance(errors[-1][1], skip_exceptions_to_reraise()): 

1592 # Covered by `test_issue_3453_regression`, just in a subprocess. 

1593 del errors[:-1] # pragma: no cover 

1594 

1595 _raise_to_user(errors, state.settings, [], " in explicit examples") 

1596 

1597 # If there were any explicit examples, they all ran successfully. 

1598 # The next step is to use the Conjecture engine to run the test on 

1599 # many different inputs. 

1600 

1601 ran_explicit_examples = Phase.explicit in state.settings.phases and getattr( 

1602 wrapped_test, "hypothesis_explicit_examples", () 

1603 ) 

1604 SKIP_BECAUSE_NO_EXAMPLES = unittest.SkipTest( 

1605 "Hypothesis has been told to run no examples for this test." 

1606 ) 

1607 if not ( 

1608 Phase.reuse in settings.phases or Phase.generate in settings.phases 

1609 ): 

1610 if not ran_explicit_examples: 

1611 raise SKIP_BECAUSE_NO_EXAMPLES 

1612 return 

1613 

1614 try: 

1615 if isinstance(runner, TestCase) and hasattr(runner, "subTest"): 

1616 subTest = runner.subTest 

1617 try: 

1618 runner.subTest = types.MethodType(fake_subTest, runner) 

1619 state.run_engine() 

1620 finally: 

1621 runner.subTest = subTest 

1622 else: 

1623 state.run_engine() 

1624 except BaseException as e: 

1625 # The exception caught here should either be an actual test 

1626 # failure (or BaseExceptionGroup), or some kind of fatal error 

1627 # that caused the engine to stop. 

1628 

1629 generated_seed = wrapped_test._hypothesis_internal_use_generated_seed 

1630 with local_settings(settings): 

1631 if not (state.failed_normally or generated_seed is None): 

1632 if running_under_pytest: 

1633 report( 

1634 f"You can add @seed({generated_seed}) to this test or " 

1635 f"run pytest with --hypothesis-seed={generated_seed} " 

1636 "to reproduce this failure." 

1637 ) 

1638 else: 

1639 report( 

1640 f"You can add @seed({generated_seed}) to this test to " 

1641 "reproduce this failure." 

1642 ) 

1643 # The dance here is to avoid showing users long tracebacks 

1644 # full of Hypothesis internals they don't care about. 

1645 # We have to do this inline, to avoid adding another 

1646 # internal stack frame just when we've removed the rest. 

1647 # 

1648 # Using a variable for our trimmed error ensures that the line 

1649 # which will actually appear in tracebacks is as clear as 

1650 # possible - "raise the_error_hypothesis_found". 

1651 the_error_hypothesis_found = e.with_traceback( 

1652 None 

1653 if isinstance(e, BaseExceptionGroup) 

1654 else get_trimmed_traceback() 

1655 ) 

1656 raise the_error_hypothesis_found 

1657 

1658 if not (ran_explicit_examples or state.ever_executed): 

1659 raise SKIP_BECAUSE_NO_EXAMPLES 

1660 

1661 def _get_fuzz_target() -> ( 

1662 Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]] 

1663 ): 

1664 # Because fuzzing interfaces are very performance-sensitive, we use a 

1665 # somewhat more complicated structure here. `_get_fuzz_target()` is 

1666 # called by the `HypothesisHandle.fuzz_one_input` property, allowing 

1667 # us to defer our collection of the settings, random instance, and 

1668 # reassignable `inner_test` (etc) until `fuzz_one_input` is accessed. 

1669 # 

1670 # We then share the performance cost of setting up `state` between 

1671 # many invocations of the target. We explicitly force `deadline=None` 

1672 # for performance reasons, saving ~40% the runtime of an empty test. 

1673 test = wrapped_test.hypothesis.inner_test 

1674 settings = Settings( 

1675 parent=wrapped_test._hypothesis_internal_use_settings, deadline=None 

1676 ) 

1677 random = get_random_for_wrapped_test(test, wrapped_test) 

1678 _args, _kwargs, stuff = process_arguments_to_given( 

1679 wrapped_test, (), {}, given_kwargs, new_signature.parameters 

1680 ) 

1681 assert not _args 

1682 assert not _kwargs 

1683 state = StateForActualGivenExecution( 

1684 stuff, test, settings, random, wrapped_test 

1685 ) 

1686 digest = function_digest(test) 

1687 # We track the minimal-so-far example for each distinct origin, so 

1688 # that we track log-n instead of n examples for long runs. In particular 

1689 # it means that we saturate for common errors in long runs instead of 

1690 # storing huge volumes of low-value data. 

1691 minimal_failures: dict = {} 

1692 

1693 def fuzz_one_input( 

1694 buffer: Union[bytes, bytearray, memoryview, BinaryIO] 

1695 ) -> Optional[bytes]: 

1696 # This inner part is all that the fuzzer will actually run, 

1697 # so we keep it as small and as fast as possible. 

1698 if isinstance(buffer, io.IOBase): 

1699 buffer = buffer.read(BUFFER_SIZE) 

1700 assert isinstance(buffer, (bytes, bytearray, memoryview)) 

1701 data = ConjectureData.for_buffer(buffer) 

1702 try: 

1703 state.execute_once(data) 

1704 except (StopTest, UnsatisfiedAssumption): 

1705 return None 

1706 except BaseException: 

1707 buffer = bytes(data.buffer) 

1708 known = minimal_failures.get(data.interesting_origin) 

1709 if settings.database is not None and ( 

1710 known is None or sort_key(buffer) <= sort_key(known) 

1711 ): 

1712 settings.database.save(digest, buffer) 

1713 minimal_failures[data.interesting_origin] = buffer 

1714 raise 

1715 return bytes(data.buffer) 

1716 

1717 fuzz_one_input.__doc__ = HypothesisHandle.fuzz_one_input.__doc__ 

1718 return fuzz_one_input 

1719 

1720 # After having created the decorated test function, we need to copy 

1721 # over some attributes to make the switch as seamless as possible. 

1722 

1723 for attrib in dir(test): 

1724 if not (attrib.startswith("_") or hasattr(wrapped_test, attrib)): 

1725 setattr(wrapped_test, attrib, getattr(test, attrib)) 

1726 wrapped_test.is_hypothesis_test = True 

1727 if hasattr(test, "_hypothesis_internal_settings_applied"): 

1728 # Used to check if @settings is applied twice. 

1729 wrapped_test._hypothesis_internal_settings_applied = True 

1730 wrapped_test._hypothesis_internal_use_seed = getattr( 

1731 test, "_hypothesis_internal_use_seed", None 

1732 ) 

1733 wrapped_test._hypothesis_internal_use_settings = ( 

1734 getattr(test, "_hypothesis_internal_use_settings", None) or Settings.default 

1735 ) 

1736 wrapped_test._hypothesis_internal_use_reproduce_failure = getattr( 

1737 test, "_hypothesis_internal_use_reproduce_failure", None 

1738 ) 

1739 wrapped_test.hypothesis = HypothesisHandle(test, _get_fuzz_target, given_kwargs) 

1740 return wrapped_test 

1741 

1742 return run_test_as_given 

1743 

1744 

1745def find( 

1746 specifier: SearchStrategy[Ex], 

1747 condition: Callable[[Any], bool], 

1748 *, 

1749 settings: Optional[Settings] = None, 

1750 random: Optional[Random] = None, 

1751 database_key: Optional[bytes] = None, 

1752) -> Ex: 

1753 """Returns the minimal example from the given strategy ``specifier`` that 

1754 matches the predicate function ``condition``.""" 

1755 if settings is None: 

1756 settings = Settings(max_examples=2000) 

1757 settings = Settings( 

1758 settings, suppress_health_check=list(HealthCheck), report_multiple_bugs=False 

1759 ) 

1760 

1761 if database_key is None and settings.database is not None: 

1762 # Note: The database key is not guaranteed to be unique. If not, replaying 

1763 # of database examples may fail to reproduce due to being replayed on the 

1764 # wrong condition. 

1765 database_key = function_digest(condition) 

1766 

1767 if not isinstance(specifier, SearchStrategy): 

1768 raise InvalidArgument( 

1769 f"Expected SearchStrategy but got {specifier!r} of " 

1770 f"type {type(specifier).__name__}" 

1771 ) 

1772 specifier.validate() 

1773 

1774 last: List[Ex] = [] 

1775 

1776 @settings 

1777 @given(specifier) 

1778 def test(v): 

1779 if condition(v): 

1780 last[:] = [v] 

1781 raise Found 

1782 

1783 if random is not None: 

1784 test = seed(random.getrandbits(64))(test) 

1785 

1786 # Aliasing as Any avoids mypy errors (attr-defined) when accessing and 

1787 # setting custom attributes on the decorated function or class. 

1788 _test: Any = test 

1789 _test._hypothesis_internal_is_find = True 

1790 _test._hypothesis_internal_database_key = database_key 

1791 

1792 try: 

1793 test() 

1794 except Found: 

1795 return last[0] 

1796 

1797 raise NoSuchExample(get_pretty_function_description(condition))