Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/core.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

804 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""This module provides the core primitives of Hypothesis, such as given.""" 

12import base64 

13import contextlib 

14import dataclasses 

15import datetime 

16import inspect 

17import io 

18import math 

19import os 

20import sys 

21import threading 

22import time 

23import traceback 

24import types 

25import unittest 

26import warnings 

27import zlib 

28from collections import defaultdict 

29from collections.abc import Coroutine, Generator, Hashable, Iterable, Sequence 

30from dataclasses import dataclass, field 

31from functools import partial 

32from inspect import Parameter 

33from random import Random 

34from threading import Lock 

35from typing import ( 

36 Any, 

37 BinaryIO, 

38 Callable, 

39 Optional, 

40 TypeVar, 

41 Union, 

42 overload, 

43) 

44from unittest import TestCase 

45 

46from hypothesis import strategies as st 

47from hypothesis._settings import ( 

48 HealthCheck, 

49 Phase, 

50 Verbosity, 

51 all_settings, 

52 local_settings, 

53 settings as Settings, 

54) 

55from hypothesis.control import BuildContext, currently_in_test_context 

56from hypothesis.database import choices_from_bytes, choices_to_bytes 

57from hypothesis.errors import ( 

58 BackendCannotProceed, 

59 DeadlineExceeded, 

60 DidNotReproduce, 

61 FailedHealthCheck, 

62 FlakyFailure, 

63 FlakyReplay, 

64 Found, 

65 Frozen, 

66 HypothesisException, 

67 HypothesisWarning, 

68 InvalidArgument, 

69 NoSuchExample, 

70 StopTest, 

71 Unsatisfiable, 

72 UnsatisfiedAssumption, 

73) 

74from hypothesis.internal import observability 

75from hypothesis.internal.compat import ( 

76 PYPY, 

77 BaseExceptionGroup, 

78 EllipsisType, 

79 add_note, 

80 bad_django_TestCase, 

81 get_type_hints, 

82 int_from_bytes, 

83) 

84from hypothesis.internal.conjecture.choice import ChoiceT 

85from hypothesis.internal.conjecture.data import ConjectureData, Status 

86from hypothesis.internal.conjecture.engine import BUFFER_SIZE, ConjectureRunner 

87from hypothesis.internal.conjecture.junkdrawer import ( 

88 ensure_free_stackframes, 

89 gc_cumulative_time, 

90) 

91from hypothesis.internal.conjecture.providers import ( 

92 BytestringProvider, 

93 PrimitiveProvider, 

94) 

95from hypothesis.internal.conjecture.shrinker import sort_key 

96from hypothesis.internal.entropy import deterministic_PRNG 

97from hypothesis.internal.escalation import ( 

98 InterestingOrigin, 

99 current_pytest_item, 

100 format_exception, 

101 get_trimmed_traceback, 

102 is_hypothesis_file, 

103) 

104from hypothesis.internal.healthcheck import fail_health_check 

105from hypothesis.internal.observability import ( 

106 InfoObservation, 

107 InfoObservationType, 

108 deliver_observation, 

109 make_testcase, 

110 observability_enabled, 

111) 

112from hypothesis.internal.reflection import ( 

113 convert_positional_arguments, 

114 define_function_signature, 

115 function_digest, 

116 get_pretty_function_description, 

117 get_signature, 

118 impersonate, 

119 is_mock, 

120 nicerepr, 

121 proxies, 

122 repr_call, 

123) 

124from hypothesis.internal.scrutineer import ( 

125 MONITORING_TOOL_ID, 

126 Trace, 

127 Tracer, 

128 explanatory_lines, 

129 tractable_coverage_report, 

130) 

131from hypothesis.internal.validation import check_type 

132from hypothesis.reporting import ( 

133 current_verbosity, 

134 report, 

135 verbose_report, 

136 with_reporter, 

137) 

138from hypothesis.statistics import describe_statistics, describe_targets, note_statistics 

139from hypothesis.strategies._internal.misc import NOTHING 

140from hypothesis.strategies._internal.strategies import ( 

141 Ex, 

142 SearchStrategy, 

143 check_strategy, 

144) 

145from hypothesis.utils.conventions import not_set 

146from hypothesis.utils.threading import ThreadLocal 

147from hypothesis.vendor.pretty import RepresentationPrinter 

148from hypothesis.version import __version__ 

149 

150TestFunc = TypeVar("TestFunc", bound=Callable) 

151 

152 

153running_under_pytest = False 

154pytest_shows_exceptiongroups = True 

155global_force_seed = None 

156# `threadlocal` stores "engine-global" constants, which are global relative to a 

157# ConjectureRunner instance (roughly speaking). Since only one conjecture runner 

158# instance can be active per thread, making engine constants thread-local prevents 

159# the ConjectureRunner instances of concurrent threads from treading on each other. 

160threadlocal = ThreadLocal(_hypothesis_global_random=lambda: None) 

161 

162 

163@dataclass 

164class Example: 

165 args: Any 

166 kwargs: Any 

167 # Plus two optional arguments for .xfail() 

168 raises: Any = field(default=None) 

169 reason: Any = field(default=None) 

170 

171 

172# TODO_DOCS link to not-yet-existent patch-dumping docs 

173 

174 

175class example: 

176 """ 

177 Add an explicit input to a Hypothesis test, which Hypothesis will always 

178 try before generating random inputs. This combines the randomized nature of 

179 Hypothesis generation with a traditional parametrized test. 

180 

181 For example: 

182 

183 .. code-block:: python 

184 

185 @example("Hello world") 

186 @example("some string with special significance") 

187 @given(st.text()) 

188 def test_strings(s): 

189 pass 

190 

191 will call ``test_strings("Hello World")`` and 

192 ``test_strings("some string with special significance")`` before generating 

193 any random inputs. |@example| may be placed in any order relative to |@given| 

194 and |@settings|. 

195 

196 Explicit inputs from |@example| are run in the |Phase.explicit| phase. 

197 Explicit inputs do not count towards |settings.max_examples|. Note that 

198 explicit inputs added by |@example| do not shrink. If an explicit input 

199 fails, Hypothesis will stop and report the failure without generating any 

200 random inputs. 

201 

202 |@example| can also be used to easily reproduce a failure. For instance, if 

203 Hypothesis reports that ``f(n=[0, math.nan])`` fails, you can add 

204 ``@example(n=[0, math.nan])`` to your test to quickly reproduce that failure. 

205 

206 Arguments to ``@example`` 

207 ------------------------- 

208 

209 Arguments to |@example| have the same behavior and restrictions as arguments 

210 to |@given|. This means they may be either positional or keyword arguments 

211 (but not both in the same |@example|): 

212 

213 .. code-block:: python 

214 

215 @example(1, 2) 

216 @example(x=1, y=2) 

217 @given(st.integers(), st.integers()) 

218 def test(x, y): 

219 pass 

220 

221 Noting that while arguments to |@given| are strategies (like |st.integers|), 

222 arguments to |@example| are values instead (like ``1``). 

223 

224 See the :ref:`given-arguments` section for full details. 

225 """ 

226 

227 def __init__(self, *args: Any, **kwargs: Any) -> None: 

228 if args and kwargs: 

229 raise InvalidArgument( 

230 "Cannot mix positional and keyword arguments for examples" 

231 ) 

232 if not (args or kwargs): 

233 raise InvalidArgument("An example must provide at least one argument") 

234 

235 self.hypothesis_explicit_examples: list[Example] = [] 

236 self._this_example = Example(tuple(args), kwargs) 

237 

238 def __call__(self, test: TestFunc) -> TestFunc: 

239 if not hasattr(test, "hypothesis_explicit_examples"): 

240 test.hypothesis_explicit_examples = self.hypothesis_explicit_examples # type: ignore 

241 test.hypothesis_explicit_examples.append(self._this_example) # type: ignore 

242 return test 

243 

244 def xfail( 

245 self, 

246 condition: bool = True, # noqa: FBT002 

247 *, 

248 reason: str = "", 

249 raises: Union[ 

250 type[BaseException], tuple[type[BaseException], ...] 

251 ] = BaseException, 

252 ) -> "example": 

253 """Mark this example as an expected failure, similarly to 

254 :obj:`pytest.mark.xfail(strict=True) <pytest.mark.xfail>`. 

255 

256 Expected-failing examples allow you to check that your test does fail on 

257 some examples, and therefore build confidence that *passing* tests are 

258 because your code is working, not because the test is missing something. 

259 

260 .. code-block:: python 

261 

262 @example(...).xfail() 

263 @example(...).xfail(reason="Prices must be non-negative") 

264 @example(...).xfail(raises=(KeyError, ValueError)) 

265 @example(...).xfail(sys.version_info[:2] >= (3, 12), reason="needs py 3.12") 

266 @example(...).xfail(condition=sys.platform != "linux", raises=OSError) 

267 def test(x): 

268 pass 

269 

270 .. note:: 

271 

272 Expected-failing examples are handled separately from those generated 

273 by strategies, so you should usually ensure that there is no overlap. 

274 

275 .. code-block:: python 

276 

277 @example(x=1, y=0).xfail(raises=ZeroDivisionError) 

278 @given(x=st.just(1), y=st.integers()) # Missing `.filter(bool)`! 

279 def test_fraction(x, y): 

280 # This test will try the explicit example and see it fail as 

281 # expected, then go on to generate more examples from the 

282 # strategy. If we happen to generate y=0, the test will fail 

283 # because only the explicit example is treated as xfailing. 

284 x / y 

285 """ 

286 check_type(bool, condition, "condition") 

287 check_type(str, reason, "reason") 

288 if not ( 

289 isinstance(raises, type) and issubclass(raises, BaseException) 

290 ) and not ( 

291 isinstance(raises, tuple) 

292 and raises # () -> expected to fail with no error, which is impossible 

293 and all( 

294 isinstance(r, type) and issubclass(r, BaseException) for r in raises 

295 ) 

296 ): 

297 raise InvalidArgument( 

298 f"{raises=} must be an exception type or tuple of exception types" 

299 ) 

300 if condition: 

301 self._this_example = dataclasses.replace( 

302 self._this_example, raises=raises, reason=reason 

303 ) 

304 return self 

305 

306 def via(self, whence: str, /) -> "example": 

307 """Attach a machine-readable label noting what the origin of this example 

308 was. |example.via| is completely optional and does not change runtime 

309 behavior. 

310 

311 |example.via| is intended to support self-documenting behavior, as well as 

312 tooling which might add (or remove) |@example| decorators automatically. 

313 For example: 

314 

315 .. code-block:: python 

316 

317 # Annotating examples is optional and does not change runtime behavior 

318 @example(...) 

319 @example(...).via("regression test for issue #42") 

320 @example(...).via("discovered failure") 

321 def test(x): 

322 pass 

323 

324 .. note:: 

325 

326 `HypoFuzz <https://hypofuzz.com/>`_ uses |example.via| to tag examples 

327 in the patch of its high-coverage set of explicit inputs, on 

328 `the patches page <https://hypofuzz.com/example-dashboard/#/patches>`_. 

329 """ 

330 if not isinstance(whence, str): 

331 raise InvalidArgument(".via() must be passed a string") 

332 # This is deliberately a no-op at runtime; the tools operate on source code. 

333 return self 

334 

335 

336def seed(seed: Hashable) -> Callable[[TestFunc], TestFunc]: 

337 """ 

338 Seed the randomness for this test. 

339 

340 ``seed`` may be any hashable object. No exact meaning for ``seed`` is provided 

341 other than that for a fixed seed value Hypothesis will produce the same 

342 examples (assuming that there are no other sources of nondeterminisim, such 

343 as timing, hash randomization, or external state). 

344 

345 For example, the following test function and |RuleBasedStateMachine| will 

346 each generate the same series of examples each time they are executed: 

347 

348 .. code-block:: python 

349 

350 @seed(1234) 

351 @given(st.integers()) 

352 def test(n): ... 

353 

354 @seed(6789) 

355 class MyMachine(RuleBasedStateMachine): ... 

356 

357 If using pytest, you can alternatively pass ``--hypothesis-seed`` on the 

358 command line. 

359 

360 Setting a seed overrides |settings.derandomize|, which is designed to enable 

361 deterministic CI tests rather than reproducing observed failures. 

362 

363 Hypothesis will only print the seed which would reproduce a failure if a test 

364 fails in an unexpected way, for instance inside Hypothesis internals. 

365 """ 

366 

367 def accept(test): 

368 test._hypothesis_internal_use_seed = seed 

369 current_settings = getattr(test, "_hypothesis_internal_use_settings", None) 

370 test._hypothesis_internal_use_settings = Settings( 

371 current_settings, database=None 

372 ) 

373 return test 

374 

375 return accept 

376 

377 

378# TODO_DOCS: link to /explanation/choice-sequence 

379 

380 

381def reproduce_failure(version: str, blob: bytes) -> Callable[[TestFunc], TestFunc]: 

382 """ 

383 Run the example corresponding to the binary ``blob`` in order to reproduce a 

384 failure. ``blob`` is a serialized version of the internal input representation 

385 of Hypothesis. 

386 

387 A test decorated with |@reproduce_failure| always runs exactly one example, 

388 which is expected to cause a failure. If the provided ``blob`` does not 

389 cause a failure, Hypothesis will raise |DidNotReproduce|. 

390 

391 Hypothesis will print an |@reproduce_failure| decorator if 

392 |settings.print_blob| is ``True`` (which is the default in CI). 

393 

394 |@reproduce_failure| is intended to be temporarily added to your test suite in 

395 order to reproduce a failure. It is not intended to be a permanent addition to 

396 your test suite. Because of this, no compatibility guarantees are made across 

397 Hypothesis versions, and |@reproduce_failure| will error if used on a different 

398 Hypothesis version than it was created for. 

399 

400 .. seealso:: 

401 

402 See also the :doc:`/tutorial/replaying-failures` tutorial. 

403 """ 

404 

405 def accept(test): 

406 test._hypothesis_internal_use_reproduce_failure = (version, blob) 

407 return test 

408 

409 return accept 

410 

411 

412def reproduction_decorator(choices: Iterable[ChoiceT]) -> str: 

413 return f"@reproduce_failure({__version__!r}, {encode_failure(choices)!r})" 

414 

415 

416def encode_failure(choices: Iterable[ChoiceT]) -> bytes: 

417 blob = choices_to_bytes(choices) 

418 compressed = zlib.compress(blob) 

419 if len(compressed) < len(blob): 

420 blob = b"\1" + compressed 

421 else: 

422 blob = b"\0" + blob 

423 return base64.b64encode(blob) 

424 

425 

426def decode_failure(blob: bytes) -> Sequence[ChoiceT]: 

427 try: 

428 decoded = base64.b64decode(blob) 

429 except Exception: 

430 raise InvalidArgument(f"Invalid base64 encoded string: {blob!r}") from None 

431 

432 prefix = decoded[:1] 

433 if prefix == b"\0": 

434 decoded = decoded[1:] 

435 elif prefix == b"\1": 

436 try: 

437 decoded = zlib.decompress(decoded[1:]) 

438 except zlib.error as err: 

439 raise InvalidArgument( 

440 f"Invalid zlib compression for blob {blob!r}" 

441 ) from err 

442 else: 

443 raise InvalidArgument( 

444 f"Could not decode blob {blob!r}: Invalid start byte {prefix!r}" 

445 ) 

446 

447 choices = choices_from_bytes(decoded) 

448 if choices is None: 

449 raise InvalidArgument(f"Invalid serialized choice sequence for blob {blob!r}") 

450 

451 return choices 

452 

453 

454def _invalid(message, *, exc=InvalidArgument, test, given_kwargs): 

455 @impersonate(test) 

456 def wrapped_test(*arguments, **kwargs): # pragma: no cover # coverage limitation 

457 raise exc(message) 

458 

459 wrapped_test.is_hypothesis_test = True 

460 wrapped_test.hypothesis = HypothesisHandle( 

461 inner_test=test, 

462 _get_fuzz_target=wrapped_test, 

463 _given_kwargs=given_kwargs, 

464 ) 

465 return wrapped_test 

466 

467 

468def is_invalid_test(test, original_sig, given_arguments, given_kwargs): 

469 """Check the arguments to ``@given`` for basic usage constraints. 

470 

471 Most errors are not raised immediately; instead we return a dummy test 

472 function that will raise the appropriate error if it is actually called. 

473 When the user runs a subset of tests (e.g via ``pytest -k``), errors will 

474 only be reported for tests that actually ran. 

475 """ 

476 invalid = partial(_invalid, test=test, given_kwargs=given_kwargs) 

477 

478 if not (given_arguments or given_kwargs): 

479 return invalid("given must be called with at least one argument") 

480 

481 params = list(original_sig.parameters.values()) 

482 pos_params = [p for p in params if p.kind is p.POSITIONAL_OR_KEYWORD] 

483 kwonly_params = [p for p in params if p.kind is p.KEYWORD_ONLY] 

484 if given_arguments and params != pos_params: 

485 return invalid( 

486 "positional arguments to @given are not supported with varargs, " 

487 "varkeywords, positional-only, or keyword-only arguments" 

488 ) 

489 

490 if len(given_arguments) > len(pos_params): 

491 return invalid( 

492 f"Too many positional arguments for {test.__name__}() were passed to " 

493 f"@given - expected at most {len(pos_params)} " 

494 f"arguments, but got {len(given_arguments)} {given_arguments!r}" 

495 ) 

496 

497 if ... in given_arguments: 

498 return invalid( 

499 "... was passed as a positional argument to @given, but may only be " 

500 "passed as a keyword argument or as the sole argument of @given" 

501 ) 

502 

503 if given_arguments and given_kwargs: 

504 return invalid("cannot mix positional and keyword arguments to @given") 

505 extra_kwargs = [ 

506 k for k in given_kwargs if k not in {p.name for p in pos_params + kwonly_params} 

507 ] 

508 if extra_kwargs and (params == [] or params[-1].kind is not params[-1].VAR_KEYWORD): 

509 arg = extra_kwargs[0] 

510 extra = "" 

511 if arg in all_settings: 

512 extra = f". Did you mean @settings({arg}={given_kwargs[arg]!r})?" 

513 return invalid( 

514 f"{test.__name__}() got an unexpected keyword argument {arg!r}, " 

515 f"from `{arg}={given_kwargs[arg]!r}` in @given{extra}" 

516 ) 

517 if any(p.default is not p.empty for p in params): 

518 return invalid("Cannot apply @given to a function with defaults.") 

519 

520 # This case would raise Unsatisfiable *anyway*, but by detecting it here we can 

521 # provide a much more helpful error message for people e.g. using the Ghostwriter. 

522 empty = [ 

523 f"{s!r} (arg {idx})" for idx, s in enumerate(given_arguments) if s is NOTHING 

524 ] + [f"{name}={s!r}" for name, s in given_kwargs.items() if s is NOTHING] 

525 if empty: 

526 strats = "strategies" if len(empty) > 1 else "strategy" 

527 return invalid( 

528 f"Cannot generate examples from empty {strats}: " + ", ".join(empty), 

529 exc=Unsatisfiable, 

530 ) 

531 

532 

533def execute_explicit_examples(state, wrapped_test, arguments, kwargs, original_sig): 

534 assert isinstance(state, StateForActualGivenExecution) 

535 posargs = [ 

536 p.name 

537 for p in original_sig.parameters.values() 

538 if p.kind is p.POSITIONAL_OR_KEYWORD 

539 ] 

540 

541 for example in reversed(getattr(wrapped_test, "hypothesis_explicit_examples", ())): 

542 assert isinstance(example, Example) 

543 # All of this validation is to check that @example() got "the same" arguments 

544 # as @given, i.e. corresponding to the same parameters, even though they might 

545 # be any mixture of positional and keyword arguments. 

546 if example.args: 

547 assert not example.kwargs 

548 if any( 

549 p.kind is p.POSITIONAL_ONLY for p in original_sig.parameters.values() 

550 ): 

551 raise InvalidArgument( 

552 "Cannot pass positional arguments to @example() when decorating " 

553 "a test function which has positional-only parameters." 

554 ) 

555 if len(example.args) > len(posargs): 

556 raise InvalidArgument( 

557 "example has too many arguments for test. Expected at most " 

558 f"{len(posargs)} but got {len(example.args)}" 

559 ) 

560 example_kwargs = dict(zip(posargs[-len(example.args) :], example.args)) 

561 else: 

562 example_kwargs = dict(example.kwargs) 

563 given_kws = ", ".join( 

564 repr(k) for k in sorted(wrapped_test.hypothesis._given_kwargs) 

565 ) 

566 example_kws = ", ".join(repr(k) for k in sorted(example_kwargs)) 

567 if given_kws != example_kws: 

568 raise InvalidArgument( 

569 f"Inconsistent args: @given() got strategies for {given_kws}, " 

570 f"but @example() got arguments for {example_kws}" 

571 ) from None 

572 

573 # This is certainly true because the example_kwargs exactly match the params 

574 # reserved by @given(), which are then remove from the function signature. 

575 assert set(example_kwargs).isdisjoint(kwargs) 

576 example_kwargs.update(kwargs) 

577 

578 if Phase.explicit not in state.settings.phases: 

579 continue 

580 

581 with local_settings(state.settings): 

582 fragments_reported = [] 

583 empty_data = ConjectureData.for_choices([]) 

584 try: 

585 execute_example = partial( 

586 state.execute_once, 

587 empty_data, 

588 is_final=True, 

589 print_example=True, 

590 example_kwargs=example_kwargs, 

591 ) 

592 with with_reporter(fragments_reported.append): 

593 if example.raises is None: 

594 execute_example() 

595 else: 

596 # @example(...).xfail(...) 

597 bits = ", ".join(nicerepr(x) for x in arguments) + ", ".join( 

598 f"{k}={nicerepr(v)}" for k, v in example_kwargs.items() 

599 ) 

600 try: 

601 execute_example() 

602 except failure_exceptions_to_catch() as err: 

603 if not isinstance(err, example.raises): 

604 raise 

605 # Save a string form of this example; we'll warn if it's 

606 # ever generated by the strategy (which can't be xfailed) 

607 state.xfail_example_reprs.add( 

608 repr_call(state.test, arguments, example_kwargs) 

609 ) 

610 except example.raises as err: 

611 # We'd usually check this as early as possible, but it's 

612 # possible for failure_exceptions_to_catch() to grow when 

613 # e.g. pytest is imported between import- and test-time. 

614 raise InvalidArgument( 

615 f"@example({bits}) raised an expected {err!r}, " 

616 "but Hypothesis does not treat this as a test failure" 

617 ) from err 

618 else: 

619 # Unexpectedly passing; always raise an error in this case. 

620 reason = f" because {example.reason}" * bool(example.reason) 

621 if example.raises is BaseException: 

622 name = "exception" # special-case no raises= arg 

623 elif not isinstance(example.raises, tuple): 

624 name = example.raises.__name__ 

625 elif len(example.raises) == 1: 

626 name = example.raises[0].__name__ 

627 else: 

628 name = ( 

629 ", ".join(ex.__name__ for ex in example.raises[:-1]) 

630 + f", or {example.raises[-1].__name__}" 

631 ) 

632 vowel = name.upper()[0] in "AEIOU" 

633 raise AssertionError( 

634 f"Expected a{'n' * vowel} {name} from @example({bits})" 

635 f"{reason}, but no exception was raised." 

636 ) 

637 except UnsatisfiedAssumption: 

638 # Odd though it seems, we deliberately support explicit examples that 

639 # are then rejected by a call to `assume()`. As well as iterative 

640 # development, this is rather useful to replay Hypothesis' part of 

641 # a saved failure when other arguments are supplied by e.g. pytest. 

642 # See https://github.com/HypothesisWorks/hypothesis/issues/2125 

643 with contextlib.suppress(StopTest): 

644 empty_data.conclude_test(Status.INVALID) 

645 except BaseException as err: 

646 # In order to support reporting of multiple failing examples, we yield 

647 # each of the (report text, error) pairs we find back to the top-level 

648 # runner. This also ensures that user-facing stack traces have as few 

649 # frames of Hypothesis internals as possible. 

650 err = err.with_traceback(get_trimmed_traceback()) 

651 

652 # One user error - whether misunderstanding or typo - we've seen a few 

653 # times is to pass strategies to @example() where values are expected. 

654 # Checking is easy, and false-positives not much of a problem, so: 

655 if isinstance(err, failure_exceptions_to_catch()) and any( 

656 isinstance(arg, SearchStrategy) 

657 for arg in example.args + tuple(example.kwargs.values()) 

658 ): 

659 new = HypothesisWarning( 

660 "The @example() decorator expects to be passed values, but " 

661 "you passed strategies instead. See https://hypothesis." 

662 "readthedocs.io/en/latest/reference/api.html#hypothesis" 

663 ".example for details." 

664 ) 

665 new.__cause__ = err 

666 err = new 

667 

668 with contextlib.suppress(StopTest): 

669 empty_data.conclude_test(Status.INVALID) 

670 yield (fragments_reported, err) 

671 if ( 

672 state.settings.report_multiple_bugs 

673 and pytest_shows_exceptiongroups 

674 and isinstance(err, failure_exceptions_to_catch()) 

675 and not isinstance(err, skip_exceptions_to_reraise()) 

676 ): 

677 continue 

678 break 

679 finally: 

680 if fragments_reported: 

681 assert fragments_reported[0].startswith("Falsifying example") 

682 fragments_reported[0] = fragments_reported[0].replace( 

683 "Falsifying example", "Falsifying explicit example", 1 

684 ) 

685 

686 empty_data.freeze() 

687 tc = make_testcase( 

688 run_start=state._start_timestamp, 

689 property=state.test_identifier, 

690 data=empty_data, 

691 how_generated="explicit example", 

692 representation=state._string_repr, 

693 timing=state._timing_features, 

694 ) 

695 deliver_observation(tc) 

696 

697 if fragments_reported: 

698 verbose_report(fragments_reported[0].replace("Falsifying", "Trying", 1)) 

699 for f in fragments_reported[1:]: 

700 verbose_report(f) 

701 

702 

703def get_random_for_wrapped_test(test, wrapped_test): 

704 settings = wrapped_test._hypothesis_internal_use_settings 

705 wrapped_test._hypothesis_internal_use_generated_seed = None 

706 

707 if wrapped_test._hypothesis_internal_use_seed is not None: 

708 return Random(wrapped_test._hypothesis_internal_use_seed) 

709 elif settings.derandomize: 

710 return Random(int_from_bytes(function_digest(test))) 

711 elif global_force_seed is not None: 

712 return Random(global_force_seed) 

713 else: 

714 if threadlocal._hypothesis_global_random is None: # pragma: no cover 

715 threadlocal._hypothesis_global_random = Random() 

716 seed = threadlocal._hypothesis_global_random.getrandbits(128) 

717 wrapped_test._hypothesis_internal_use_generated_seed = seed 

718 return Random(seed) 

719 

720 

721@dataclass 

722class Stuff: 

723 selfy: Any 

724 args: tuple 

725 kwargs: dict 

726 given_kwargs: dict 

727 

728 

729def process_arguments_to_given( 

730 wrapped_test: Any, 

731 arguments: Sequence[object], 

732 kwargs: dict[str, object], 

733 given_kwargs: dict[str, SearchStrategy], 

734 params: dict[str, Parameter], 

735) -> tuple[Sequence[object], dict[str, object], Stuff]: 

736 selfy = None 

737 arguments, kwargs = convert_positional_arguments(wrapped_test, arguments, kwargs) 

738 

739 # If the test function is a method of some kind, the bound object 

740 # will be the first named argument if there are any, otherwise the 

741 # first vararg (if any). 

742 posargs = [p.name for p in params.values() if p.kind is p.POSITIONAL_OR_KEYWORD] 

743 if posargs: 

744 selfy = kwargs.get(posargs[0]) 

745 elif arguments: 

746 selfy = arguments[0] 

747 

748 # Ensure that we don't mistake mocks for self here. 

749 # This can cause the mock to be used as the test runner. 

750 if is_mock(selfy): 

751 selfy = None 

752 

753 arguments = tuple(arguments) 

754 

755 with ensure_free_stackframes(): 

756 for k, s in given_kwargs.items(): 

757 check_strategy(s, name=k) 

758 s.validate() 

759 

760 stuff = Stuff(selfy=selfy, args=arguments, kwargs=kwargs, given_kwargs=given_kwargs) 

761 

762 return arguments, kwargs, stuff 

763 

764 

765def skip_exceptions_to_reraise(): 

766 """Return a tuple of exceptions meaning 'skip this test', to re-raise. 

767 

768 This is intended to cover most common test runners; if you would 

769 like another to be added please open an issue or pull request adding 

770 it to this function and to tests/cover/test_lazy_import.py 

771 """ 

772 # This is a set because nose may simply re-export unittest.SkipTest 

773 exceptions = set() 

774 # We use this sys.modules trick to avoid importing libraries - 

775 # you can't be an instance of a type from an unimported module! 

776 # This is fast enough that we don't need to cache the result, 

777 # and more importantly it avoids possible side-effects :-) 

778 if "unittest" in sys.modules: 

779 exceptions.add(sys.modules["unittest"].SkipTest) 

780 if "unittest2" in sys.modules: 

781 exceptions.add(sys.modules["unittest2"].SkipTest) 

782 if "nose" in sys.modules: 

783 exceptions.add(sys.modules["nose"].SkipTest) 

784 if "_pytest.outcomes" in sys.modules: 

785 exceptions.add(sys.modules["_pytest.outcomes"].Skipped) 

786 return tuple(sorted(exceptions, key=str)) 

787 

788 

789def failure_exceptions_to_catch() -> tuple[type[BaseException], ...]: 

790 """Return a tuple of exceptions meaning 'this test has failed', to catch. 

791 

792 This is intended to cover most common test runners; if you would 

793 like another to be added please open an issue or pull request. 

794 """ 

795 # While SystemExit and GeneratorExit are instances of BaseException, we also 

796 # expect them to be deterministic - unlike KeyboardInterrupt - and so we treat 

797 # them as standard exceptions, check for flakiness, etc. 

798 # See https://github.com/HypothesisWorks/hypothesis/issues/2223 for details. 

799 exceptions = [Exception, SystemExit, GeneratorExit] 

800 if "_pytest.outcomes" in sys.modules: 

801 exceptions.append(sys.modules["_pytest.outcomes"].Failed) 

802 return tuple(exceptions) 

803 

804 

805def new_given_signature(original_sig, given_kwargs): 

806 """Make an updated signature for the wrapped test.""" 

807 return original_sig.replace( 

808 parameters=[ 

809 p 

810 for p in original_sig.parameters.values() 

811 if not ( 

812 p.name in given_kwargs 

813 and p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY) 

814 ) 

815 ], 

816 return_annotation=None, 

817 ) 

818 

819 

820def default_executor(data, function): 

821 return function(data) 

822 

823 

824def get_executor(runner): 

825 try: 

826 execute_example = runner.execute_example 

827 except AttributeError: 

828 pass 

829 else: 

830 return lambda data, function: execute_example(partial(function, data)) 

831 

832 if hasattr(runner, "setup_example") or hasattr(runner, "teardown_example"): 

833 setup = getattr(runner, "setup_example", None) or (lambda: None) 

834 teardown = getattr(runner, "teardown_example", None) or (lambda ex: None) 

835 

836 def execute(data, function): 

837 token = None 

838 try: 

839 token = setup() 

840 return function(data) 

841 finally: 

842 teardown(token) 

843 

844 return execute 

845 

846 return default_executor 

847 

848 

849@contextlib.contextmanager 

850def unwrap_markers_from_group() -> Generator[None, None, None]: 

851 # This function is a crude solution, a better way of resolving it would probably 

852 # be to rewrite a bunch of exception handlers to use except*. 

853 T = TypeVar("T", bound=BaseException) 

854 

855 def _flatten_group(excgroup: BaseExceptionGroup[T]) -> list[T]: 

856 found_exceptions: list[T] = [] 

857 for exc in excgroup.exceptions: 

858 if isinstance(exc, BaseExceptionGroup): 

859 found_exceptions.extend(_flatten_group(exc)) 

860 else: 

861 found_exceptions.append(exc) 

862 return found_exceptions 

863 

864 try: 

865 yield 

866 except BaseExceptionGroup as excgroup: 

867 frozen_exceptions, non_frozen_exceptions = excgroup.split(Frozen) 

868 

869 # group only contains Frozen, reraise the group 

870 # it doesn't matter what we raise, since any exceptions get disregarded 

871 # and reraised as StopTest if data got frozen. 

872 if non_frozen_exceptions is None: 

873 raise 

874 # in all other cases they are discarded 

875 

876 # Can RewindRecursive end up in this group? 

877 _, user_exceptions = non_frozen_exceptions.split( 

878 lambda e: isinstance(e, (StopTest, HypothesisException)) 

879 ) 

880 

881 # this might contain marker exceptions, or internal errors, but not frozen. 

882 if user_exceptions is not None: 

883 raise 

884 

885 # single marker exception - reraise it 

886 flattened_non_frozen_exceptions: list[BaseException] = _flatten_group( 

887 non_frozen_exceptions 

888 ) 

889 if len(flattened_non_frozen_exceptions) == 1: 

890 e = flattened_non_frozen_exceptions[0] 

891 # preserve the cause of the original exception to not hinder debugging 

892 # note that __context__ is still lost though 

893 raise e from e.__cause__ 

894 

895 # multiple marker exceptions. If we re-raise the whole group we break 

896 # a bunch of logic so ....? 

897 stoptests, non_stoptests = non_frozen_exceptions.split(StopTest) 

898 

899 # TODO: stoptest+hypothesisexception ...? Is it possible? If so, what do? 

900 

901 if non_stoptests: 

902 # TODO: multiple marker exceptions is easy to produce, but the logic in the 

903 # engine does not handle it... so we just reraise the first one for now. 

904 e = _flatten_group(non_stoptests)[0] 

905 raise e from e.__cause__ 

906 assert stoptests is not None 

907 

908 # multiple stoptests: raising the one with the lowest testcounter 

909 raise min(_flatten_group(stoptests), key=lambda s_e: s_e.testcounter) 

910 

911 

912class StateForActualGivenExecution: 

913 def __init__( 

914 self, stuff, test, settings, random, wrapped_test, *, thread_overlap=None 

915 ): 

916 self.stuff = stuff 

917 self.test = test 

918 self.settings = settings 

919 self.random = random 

920 self.wrapped_test = wrapped_test 

921 self.thread_overlap = {} if thread_overlap is None else thread_overlap 

922 

923 self.test_runner = get_executor(stuff.selfy) 

924 self.is_find = getattr(wrapped_test, "_hypothesis_internal_is_find", False) 

925 self.print_given_args = getattr( 

926 wrapped_test, "_hypothesis_internal_print_given_args", True 

927 ) 

928 

929 self.last_exception = None 

930 self.falsifying_examples = () 

931 self.ever_executed = False 

932 self.xfail_example_reprs = set() 

933 self.files_to_propagate = set() 

934 self.failed_normally = False 

935 self.failed_due_to_deadline = False 

936 

937 self.explain_traces = defaultdict(set) 

938 self._start_timestamp = time.time() 

939 self._string_repr = "" 

940 self._timing_features = {} 

941 

942 @property 

943 def test_identifier(self): 

944 return getattr( 

945 current_pytest_item.value, "nodeid", None 

946 ) or get_pretty_function_description(self.wrapped_test) 

947 

948 def _should_trace(self): 

949 # NOTE: we explicitly support monkeypatching this. Keep the namespace 

950 # access intact. 

951 _trace_obs = ( 

952 observability_enabled() and observability.OBSERVABILITY_COLLECT_COVERAGE 

953 ) 

954 _trace_failure = ( 

955 self.failed_normally 

956 and not self.failed_due_to_deadline 

957 and {Phase.shrink, Phase.explain}.issubset(self.settings.phases) 

958 ) 

959 return _trace_obs or _trace_failure 

960 

961 def execute_once( 

962 self, 

963 data, 

964 *, 

965 print_example=False, 

966 is_final=False, 

967 expected_failure=None, 

968 example_kwargs=None, 

969 ): 

970 """Run the test function once, using ``data`` as input. 

971 

972 If the test raises an exception, it will propagate through to the 

973 caller of this method. Depending on its type, this could represent 

974 an ordinary test failure, or a fatal error, or a control exception. 

975 

976 If this method returns normally, the test might have passed, or 

977 it might have placed ``data`` in an unsuccessful state and then 

978 swallowed the corresponding control exception. 

979 """ 

980 

981 self.ever_executed = True 

982 data.is_find = self.is_find 

983 

984 self._string_repr = "" 

985 text_repr = None 

986 if self.settings.deadline is None and not observability_enabled(): 

987 

988 @proxies(self.test) 

989 def test(*args, **kwargs): 

990 with unwrap_markers_from_group(): 

991 # NOTE: For compatibility with Python 3.9's LL(1) 

992 # parser, this is written as a nested with-statement, 

993 # instead of a compound one. 

994 with ensure_free_stackframes(): 

995 return self.test(*args, **kwargs) 

996 

997 else: 

998 

999 @proxies(self.test) 

1000 def test(*args, **kwargs): 

1001 arg_drawtime = math.fsum(data.draw_times.values()) 

1002 arg_stateful = math.fsum(data._stateful_run_times.values()) 

1003 arg_gctime = gc_cumulative_time() 

1004 start = time.perf_counter() 

1005 try: 

1006 with unwrap_markers_from_group(): 

1007 # NOTE: For compatibility with Python 3.9's LL(1) 

1008 # parser, this is written as a nested with-statement, 

1009 # instead of a compound one. 

1010 with ensure_free_stackframes(): 

1011 result = self.test(*args, **kwargs) 

1012 finally: 

1013 finish = time.perf_counter() 

1014 in_drawtime = math.fsum(data.draw_times.values()) - arg_drawtime 

1015 in_stateful = ( 

1016 math.fsum(data._stateful_run_times.values()) - arg_stateful 

1017 ) 

1018 in_gctime = gc_cumulative_time() - arg_gctime 

1019 runtime = finish - start - in_drawtime - in_stateful - in_gctime 

1020 self._timing_features = { 

1021 "execute:test": runtime, 

1022 "overall:gc": in_gctime, 

1023 **data.draw_times, 

1024 **data._stateful_run_times, 

1025 } 

1026 

1027 if ( 

1028 (current_deadline := self.settings.deadline) is not None 

1029 # we disable the deadline check under concurrent threads, since 

1030 # cpython may switch away from a thread for arbitrarily long. 

1031 and not self.thread_overlap.get(threading.get_ident(), False) 

1032 ): 

1033 if not is_final: 

1034 current_deadline = (current_deadline // 4) * 5 

1035 if runtime >= current_deadline.total_seconds(): 

1036 raise DeadlineExceeded( 

1037 datetime.timedelta(seconds=runtime), self.settings.deadline 

1038 ) 

1039 return result 

1040 

1041 def run(data: ConjectureData) -> None: 

1042 # Set up dynamic context needed by a single test run. 

1043 if self.stuff.selfy is not None: 

1044 data.hypothesis_runner = self.stuff.selfy 

1045 # Generate all arguments to the test function. 

1046 args = self.stuff.args 

1047 kwargs = dict(self.stuff.kwargs) 

1048 if example_kwargs is None: 

1049 kw, argslices = context.prep_args_kwargs_from_strategies( 

1050 self.stuff.given_kwargs 

1051 ) 

1052 else: 

1053 kw = example_kwargs 

1054 argslices = {} 

1055 kwargs.update(kw) 

1056 if expected_failure is not None: 

1057 nonlocal text_repr 

1058 text_repr = repr_call(test, args, kwargs) 

1059 

1060 if print_example or current_verbosity() >= Verbosity.verbose: 

1061 printer = RepresentationPrinter(context=context) 

1062 if print_example: 

1063 printer.text("Falsifying example:") 

1064 else: 

1065 printer.text("Trying example:") 

1066 

1067 if self.print_given_args: 

1068 printer.text(" ") 

1069 printer.repr_call( 

1070 test.__name__, 

1071 args, 

1072 kwargs, 

1073 force_split=True, 

1074 arg_slices=argslices, 

1075 leading_comment=( 

1076 "# " + context.data.slice_comments[(0, 0)] 

1077 if (0, 0) in context.data.slice_comments 

1078 else None 

1079 ), 

1080 avoid_realization=data.provider.avoid_realization, 

1081 ) 

1082 report(printer.getvalue()) 

1083 

1084 if observability_enabled(): 

1085 printer = RepresentationPrinter(context=context) 

1086 printer.repr_call( 

1087 test.__name__, 

1088 args, 

1089 kwargs, 

1090 force_split=True, 

1091 arg_slices=argslices, 

1092 leading_comment=( 

1093 "# " + context.data.slice_comments[(0, 0)] 

1094 if (0, 0) in context.data.slice_comments 

1095 else None 

1096 ), 

1097 avoid_realization=data.provider.avoid_realization, 

1098 ) 

1099 self._string_repr = printer.getvalue() 

1100 

1101 try: 

1102 return test(*args, **kwargs) 

1103 except TypeError as e: 

1104 # If we sampled from a sequence of strategies, AND failed with a 

1105 # TypeError, *AND that exception mentions SearchStrategy*, add a note: 

1106 if ( 

1107 "SearchStrategy" in str(e) 

1108 and data._sampled_from_all_strategies_elements_message is not None 

1109 ): 

1110 msg, format_arg = data._sampled_from_all_strategies_elements_message 

1111 add_note(e, msg.format(format_arg)) 

1112 raise 

1113 finally: 

1114 if parts := getattr(data, "_stateful_repr_parts", None): 

1115 self._string_repr = "\n".join(parts) 

1116 

1117 if observability_enabled(): 

1118 printer = RepresentationPrinter(context=context) 

1119 for name, value in data._observability_args.items(): 

1120 if name.startswith("generate:Draw "): 

1121 try: 

1122 value = data.provider.realize(value) 

1123 except BackendCannotProceed: # pragma: no cover 

1124 value = "<backend failed to realize symbolic>" 

1125 printer.text(f"\n{name.removeprefix('generate:')}: ") 

1126 printer.pretty(value) 

1127 

1128 self._string_repr += printer.getvalue() 

1129 

1130 # self.test_runner can include the execute_example method, or setup/teardown 

1131 # _example, so it's important to get the PRNG and build context in place first. 

1132 # 

1133 # NOTE: For compatibility with Python 3.9's LL(1) parser, this is written as 

1134 # three nested with-statements, instead of one compound statement. 

1135 with local_settings(self.settings): 

1136 with deterministic_PRNG(): 

1137 with BuildContext( 

1138 data, is_final=is_final, wrapped_test=self.wrapped_test 

1139 ) as context: 

1140 # providers may throw in per_case_context_fn, and we'd like 

1141 # `result` to still be set in these cases. 

1142 result = None 

1143 with data.provider.per_test_case_context_manager(): 

1144 # Run the test function once, via the executor hook. 

1145 # In most cases this will delegate straight to `run(data)`. 

1146 result = self.test_runner(data, run) 

1147 

1148 # If a failure was expected, it should have been raised already, so 

1149 # instead raise an appropriate diagnostic error. 

1150 if expected_failure is not None: 

1151 exception, traceback = expected_failure 

1152 if isinstance(exception, DeadlineExceeded) and ( 

1153 runtime_secs := math.fsum( 

1154 v 

1155 for k, v in self._timing_features.items() 

1156 if k.startswith("execute:") 

1157 ) 

1158 ): 

1159 report( 

1160 "Unreliable test timings! On an initial run, this " 

1161 f"test took {exception.runtime.total_seconds() * 1000:.2f}ms, " 

1162 "which exceeded the deadline of " 

1163 f"{self.settings.deadline.total_seconds() * 1000:.2f}ms, but " 

1164 f"on a subsequent run it took {runtime_secs * 1000:.2f} ms, " 

1165 "which did not. If you expect this sort of " 

1166 "variability in your test timings, consider turning " 

1167 "deadlines off for this test by setting deadline=None." 

1168 ) 

1169 else: 

1170 report("Failed to reproduce exception. Expected: \n" + traceback) 

1171 raise FlakyFailure( 

1172 f"Hypothesis {text_repr} produces unreliable results: " 

1173 "Falsified on the first call but did not on a subsequent one", 

1174 [exception], 

1175 ) 

1176 return result 

1177 

1178 def _flaky_replay_to_failure( 

1179 self, err: FlakyReplay, context: BaseException 

1180 ) -> FlakyFailure: 

1181 # Note that in the mark_interesting case, _context_ itself 

1182 # is part of err._interesting_examples - but it's not in 

1183 # _runner.interesting_examples - this is fine, as the context 

1184 # (i.e., immediate exception) is appended. 

1185 interesting_examples = [ 

1186 self._runner.interesting_examples[origin] 

1187 for origin in err._interesting_origins 

1188 if origin in self._runner.interesting_examples 

1189 ] 

1190 exceptions = [result.expected_exception for result in interesting_examples] 

1191 exceptions.append(context) # the immediate exception 

1192 return FlakyFailure(err.reason, exceptions) 

1193 

1194 def _execute_once_for_engine(self, data: ConjectureData) -> None: 

1195 """Wrapper around ``execute_once`` that intercepts test failure 

1196 exceptions and single-test control exceptions, and turns them into 

1197 appropriate method calls to `data` instead. 

1198 

1199 This allows the engine to assume that any exception other than 

1200 ``StopTest`` must be a fatal error, and should stop the entire engine. 

1201 """ 

1202 trace: Trace = set() 

1203 try: 

1204 with Tracer(should_trace=self._should_trace()) as tracer: 

1205 try: 

1206 result = self.execute_once(data) 

1207 if ( 

1208 data.status == Status.VALID and tracer.branches 

1209 ): # pragma: no cover 

1210 # This is in fact covered by our *non-coverage* tests, but due 

1211 # to the settrace() contention *not* by our coverage tests. 

1212 self.explain_traces[None].add(frozenset(tracer.branches)) 

1213 finally: 

1214 trace = tracer.branches 

1215 if result is not None: 

1216 fail_health_check( 

1217 self.settings, 

1218 "Tests run under @given should return None, but " 

1219 f"{self.test.__name__} returned {result!r} instead.", 

1220 HealthCheck.return_value, 

1221 ) 

1222 except UnsatisfiedAssumption as e: 

1223 # An "assume" check failed, so instead we inform the engine that 

1224 # this test run was invalid. 

1225 try: 

1226 data.mark_invalid(e.reason) 

1227 except FlakyReplay as err: 

1228 # This was unexpected, meaning that the assume was flaky. 

1229 # Report it as such. 

1230 raise self._flaky_replay_to_failure(err, e) from None 

1231 except (StopTest, BackendCannotProceed): 

1232 # The engine knows how to handle this control exception, so it's 

1233 # OK to re-raise it. 

1234 raise 

1235 except ( 

1236 FailedHealthCheck, 

1237 *skip_exceptions_to_reraise(), 

1238 ): 

1239 # These are fatal errors or control exceptions that should stop the 

1240 # engine, so we re-raise them. 

1241 raise 

1242 except failure_exceptions_to_catch() as e: 

1243 # If an unhandled (i.e., non-Hypothesis) error was raised by 

1244 # Hypothesis-internal code, re-raise it as a fatal error instead 

1245 # of treating it as a test failure. 

1246 if isinstance(e, BaseExceptionGroup) and len(e.exceptions) == 1: 

1247 # When a naked exception is implicitly wrapped in an ExceptionGroup 

1248 # due to a re-raising "except*", the ExceptionGroup is constructed in 

1249 # the caller's stack frame (see #4183). This workaround is specifically 

1250 # for implicit wrapping of naked exceptions by "except*", since explicit 

1251 # raising of ExceptionGroup gets the proper traceback in the first place 

1252 # - there's no need to handle hierarchical groups here, at least if no 

1253 # such implicit wrapping happens inside hypothesis code (we only care 

1254 # about the hypothesis-or-not distinction). 

1255 # 

1256 # 01-25-2025: this was patched to give the correct 

1257 # stacktrace in cpython https://github.com/python/cpython/issues/128799. 

1258 # can remove once python3.11 is EOL. 

1259 tb = e.exceptions[0].__traceback__ or e.__traceback__ 

1260 else: 

1261 tb = e.__traceback__ 

1262 filepath = traceback.extract_tb(tb)[-1][0] 

1263 if ( 

1264 is_hypothesis_file(filepath) 

1265 and not isinstance(e, HypothesisException) 

1266 # We expect backend authors to use the provider_conformance test 

1267 # to test their backends. If an error occurs there, it is probably 

1268 # from their backend, and we would like to treat it as a standard 

1269 # error, not a hypothesis-internal error. 

1270 and not filepath.endswith( 

1271 f"internal{os.sep}conjecture{os.sep}provider_conformance.py" 

1272 ) 

1273 ): 

1274 raise 

1275 

1276 if data.frozen: 

1277 # This can happen if an error occurred in a finally 

1278 # block somewhere, suppressing our original StopTest. 

1279 # We raise a new one here to resume normal operation. 

1280 raise StopTest(data.testcounter) from e 

1281 else: 

1282 # The test failed by raising an exception, so we inform the 

1283 # engine that this test run was interesting. This is the normal 

1284 # path for test runs that fail. 

1285 tb = get_trimmed_traceback() 

1286 data.expected_traceback = format_exception(e, tb) 

1287 data.expected_exception = e 

1288 assert data.expected_traceback is not None # for mypy 

1289 verbose_report(data.expected_traceback) 

1290 

1291 self.failed_normally = True 

1292 

1293 interesting_origin = InterestingOrigin.from_exception(e) 

1294 if trace: # pragma: no cover 

1295 # Trace collection is explicitly disabled under coverage. 

1296 self.explain_traces[interesting_origin].add(frozenset(trace)) 

1297 if interesting_origin[0] == DeadlineExceeded: 

1298 self.failed_due_to_deadline = True 

1299 self.explain_traces.clear() 

1300 try: 

1301 data.mark_interesting(interesting_origin) 

1302 except FlakyReplay as err: 

1303 raise self._flaky_replay_to_failure(err, e) from None 

1304 

1305 finally: 

1306 # Conditional here so we can save some time constructing the payload; in 

1307 # other cases (without coverage) it's cheap enough to do that regardless. 

1308 if observability_enabled(): 

1309 if runner := getattr(self, "_runner", None): 

1310 phase = runner._current_phase 

1311 else: # pragma: no cover # in case of messing with internals 

1312 if self.failed_normally or self.failed_due_to_deadline: 

1313 phase = "shrink" 

1314 else: 

1315 phase = "unknown" 

1316 backend_desc = f", using backend={self.settings.backend!r}" * ( 

1317 self.settings.backend != "hypothesis" 

1318 and not getattr(runner, "_switch_to_hypothesis_provider", False) 

1319 ) 

1320 try: 

1321 data._observability_args = data.provider.realize( 

1322 data._observability_args 

1323 ) 

1324 except BackendCannotProceed: 

1325 data._observability_args = {} 

1326 

1327 try: 

1328 self._string_repr = data.provider.realize(self._string_repr) 

1329 except BackendCannotProceed: 

1330 self._string_repr = "<backend failed to realize symbolic arguments>" 

1331 

1332 try: 

1333 data.events = data.provider.realize(data.events) 

1334 except BackendCannotProceed: 

1335 data.events = {} 

1336 

1337 data.freeze() 

1338 tc = make_testcase( 

1339 run_start=self._start_timestamp, 

1340 property=self.test_identifier, 

1341 data=data, 

1342 how_generated=f"during {phase} phase{backend_desc}", 

1343 representation=self._string_repr, 

1344 arguments=data._observability_args, 

1345 timing=self._timing_features, 

1346 coverage=tractable_coverage_report(trace) or None, 

1347 phase=phase, 

1348 backend_metadata=data.provider.observe_test_case(), 

1349 ) 

1350 deliver_observation(tc) 

1351 for msg in data.provider.observe_information_messages( 

1352 lifetime="test_case" 

1353 ): 

1354 self._deliver_information_message(**msg) 

1355 self._timing_features = {} 

1356 

1357 def _deliver_information_message( 

1358 self, *, type: InfoObservationType, title: str, content: Union[str, dict] 

1359 ) -> None: 

1360 deliver_observation( 

1361 InfoObservation( 

1362 type=type, 

1363 run_start=self._start_timestamp, 

1364 property=self.test_identifier, 

1365 title=title, 

1366 content=content, 

1367 ) 

1368 ) 

1369 

1370 def run_engine(self): 

1371 """Run the test function many times, on database input and generated 

1372 input, using the Conjecture engine. 

1373 """ 

1374 # Tell pytest to omit the body of this function from tracebacks 

1375 __tracebackhide__ = True 

1376 try: 

1377 database_key = self.wrapped_test._hypothesis_internal_database_key 

1378 except AttributeError: 

1379 if global_force_seed is None: 

1380 database_key = function_digest(self.test) 

1381 else: 

1382 database_key = None 

1383 

1384 runner = self._runner = ConjectureRunner( 

1385 self._execute_once_for_engine, 

1386 settings=self.settings, 

1387 random=self.random, 

1388 database_key=database_key, 

1389 thread_overlap=self.thread_overlap, 

1390 ) 

1391 # Use the Conjecture engine to run the test function many times 

1392 # on different inputs. 

1393 runner.run() 

1394 note_statistics(runner.statistics) 

1395 if observability_enabled(): 

1396 self._deliver_information_message( 

1397 type="info", 

1398 title="Hypothesis Statistics", 

1399 content=describe_statistics(runner.statistics), 

1400 ) 

1401 for msg in ( 

1402 p if isinstance(p := runner.provider, PrimitiveProvider) else p(None) 

1403 ).observe_information_messages(lifetime="test_function"): 

1404 self._deliver_information_message(**msg) 

1405 

1406 if runner.call_count == 0: 

1407 return 

1408 if runner.interesting_examples: 

1409 self.falsifying_examples = sorted( 

1410 runner.interesting_examples.values(), 

1411 key=lambda d: sort_key(d.nodes), 

1412 reverse=True, 

1413 ) 

1414 else: 

1415 if runner.valid_examples == 0: 

1416 explanations = [] 

1417 # use a somewhat arbitrary cutoff to avoid recommending spurious 

1418 # fixes. 

1419 # eg, a few invalid examples from internal filters when the 

1420 # problem is the user generating large inputs, or a 

1421 # few overruns during internal mutation when the problem is 

1422 # impossible user filters/assumes. 

1423 if runner.invalid_examples > min(20, runner.call_count // 5): 

1424 explanations.append( 

1425 f"{runner.invalid_examples} of {runner.call_count} " 

1426 "examples failed a .filter() or assume() condition. Try " 

1427 "making your filters or assumes less strict, or rewrite " 

1428 "using strategy parameters: " 

1429 "st.integers().filter(lambda x: x > 0) fails less often " 

1430 "(that is, never) when rewritten as st.integers(min_value=1)." 

1431 ) 

1432 if runner.overrun_examples > min(20, runner.call_count // 5): 

1433 explanations.append( 

1434 f"{runner.overrun_examples} of {runner.call_count} " 

1435 "examples were too large to finish generating; try " 

1436 "reducing the typical size of your inputs?" 

1437 ) 

1438 rep = get_pretty_function_description(self.test) 

1439 raise Unsatisfiable( 

1440 f"Unable to satisfy assumptions of {rep}. " 

1441 f"{' Also, '.join(explanations)}" 

1442 ) 

1443 

1444 # If we have not traced executions, warn about that now (but only when 

1445 # we'd expect to do so reliably, i.e. on CPython>=3.12) 

1446 if ( 

1447 hasattr(sys, "monitoring") 

1448 and not PYPY 

1449 and self._should_trace() 

1450 and not Tracer.can_trace() 

1451 ): # pragma: no cover 

1452 # actually covered by our tests, but only on >= 3.12 

1453 warnings.warn( 

1454 "avoiding tracing test function because tool id " 

1455 f"{MONITORING_TOOL_ID} is already taken by tool " 

1456 f"{sys.monitoring.get_tool(MONITORING_TOOL_ID)}.", 

1457 HypothesisWarning, 

1458 stacklevel=3, 

1459 ) 

1460 

1461 if not self.falsifying_examples: 

1462 return 

1463 elif not (self.settings.report_multiple_bugs and pytest_shows_exceptiongroups): 

1464 # Pretend that we only found one failure, by discarding the others. 

1465 del self.falsifying_examples[:-1] 

1466 

1467 # The engine found one or more failures, so we need to reproduce and 

1468 # report them. 

1469 

1470 errors_to_report = [] 

1471 

1472 report_lines = describe_targets(runner.best_observed_targets) 

1473 if report_lines: 

1474 report_lines.append("") 

1475 

1476 explanations = explanatory_lines(self.explain_traces, self.settings) 

1477 for falsifying_example in self.falsifying_examples: 

1478 fragments = [] 

1479 

1480 ran_example = runner.new_conjecture_data( 

1481 falsifying_example.choices, max_choices=len(falsifying_example.choices) 

1482 ) 

1483 ran_example.slice_comments = falsifying_example.slice_comments 

1484 tb = None 

1485 origin = None 

1486 assert falsifying_example.expected_exception is not None 

1487 assert falsifying_example.expected_traceback is not None 

1488 try: 

1489 with with_reporter(fragments.append): 

1490 self.execute_once( 

1491 ran_example, 

1492 print_example=not self.is_find, 

1493 is_final=True, 

1494 expected_failure=( 

1495 falsifying_example.expected_exception, 

1496 falsifying_example.expected_traceback, 

1497 ), 

1498 ) 

1499 except StopTest as e: 

1500 # Link the expected exception from the first run. Not sure 

1501 # how to access the current exception, if it failed 

1502 # differently on this run. In fact, in the only known 

1503 # reproducer, the StopTest is caused by OVERRUN before the 

1504 # test is even executed. Possibly because all initial examples 

1505 # failed until the final non-traced replay, and something was 

1506 # exhausted? Possibly a FIXME, but sufficiently weird to 

1507 # ignore for now. 

1508 err = FlakyFailure( 

1509 "Inconsistent results: An example failed on the " 

1510 "first run but now succeeds (or fails with another " 

1511 "error, or is for some reason not runnable).", 

1512 # (note: e is a BaseException) 

1513 [falsifying_example.expected_exception or e], 

1514 ) 

1515 errors_to_report.append((fragments, err)) 

1516 except UnsatisfiedAssumption as e: # pragma: no cover # ironically flaky 

1517 err = FlakyFailure( 

1518 "Unreliable assumption: An example which satisfied " 

1519 "assumptions on the first run now fails it.", 

1520 [e], 

1521 ) 

1522 errors_to_report.append((fragments, err)) 

1523 except BaseException as e: 

1524 # If we have anything for explain-mode, this is the time to report. 

1525 fragments.extend(explanations[falsifying_example.interesting_origin]) 

1526 errors_to_report.append( 

1527 (fragments, e.with_traceback(get_trimmed_traceback())) 

1528 ) 

1529 tb = format_exception(e, get_trimmed_traceback(e)) 

1530 origin = InterestingOrigin.from_exception(e) 

1531 else: 

1532 # execute_once() will always raise either the expected error, or Flaky. 

1533 raise NotImplementedError("This should be unreachable") 

1534 finally: 

1535 ran_example.freeze() 

1536 # log our observability line for the final failing example 

1537 tc = make_testcase( 

1538 run_start=self._start_timestamp, 

1539 property=self.test_identifier, 

1540 data=ran_example, 

1541 how_generated="minimal failing example", 

1542 representation=self._string_repr, 

1543 arguments=ran_example._observability_args, 

1544 timing=self._timing_features, 

1545 coverage=None, # Not recorded when we're replaying the MFE 

1546 status="passed" if sys.exc_info()[0] else "failed", 

1547 status_reason=str(origin or "unexpected/flaky pass"), 

1548 metadata={"traceback": tb}, 

1549 ) 

1550 deliver_observation(tc) 

1551 # Whether or not replay actually raised the exception again, we want 

1552 # to print the reproduce_failure decorator for the failing example. 

1553 if self.settings.print_blob: 

1554 fragments.append( 

1555 "\nYou can reproduce this example by temporarily adding " 

1556 f"{reproduction_decorator(falsifying_example.choices)} " 

1557 "as a decorator on your test case" 

1558 ) 

1559 

1560 _raise_to_user( 

1561 errors_to_report, 

1562 self.settings, 

1563 report_lines, 

1564 # A backend might report a failure and then report verified afterwards, 

1565 # which is to be interpreted as "there are no more failures *other 

1566 # than what we already reported*". Do not report this as unsound. 

1567 unsound_backend=( 

1568 runner._verified_by 

1569 if runner._verified_by and not runner._backend_found_failure 

1570 else None 

1571 ), 

1572 ) 

1573 

1574 

1575def _raise_to_user( 

1576 errors_to_report, settings, target_lines, trailer="", *, unsound_backend=None 

1577): 

1578 """Helper function for attaching notes and grouping multiple errors.""" 

1579 failing_prefix = "Falsifying example: " 

1580 ls = [] 

1581 for fragments, err in errors_to_report: 

1582 for note in fragments: 

1583 add_note(err, note) 

1584 if note.startswith(failing_prefix): 

1585 ls.append(note.removeprefix(failing_prefix)) 

1586 if current_pytest_item.value: 

1587 current_pytest_item.value._hypothesis_failing_examples = ls 

1588 

1589 if len(errors_to_report) == 1: 

1590 _, the_error_hypothesis_found = errors_to_report[0] 

1591 else: 

1592 assert errors_to_report 

1593 the_error_hypothesis_found = BaseExceptionGroup( 

1594 f"Hypothesis found {len(errors_to_report)} distinct failures{trailer}.", 

1595 [e for _, e in errors_to_report], 

1596 ) 

1597 

1598 if settings.verbosity >= Verbosity.normal: 

1599 for line in target_lines: 

1600 add_note(the_error_hypothesis_found, line) 

1601 

1602 if unsound_backend: 

1603 add_note( 

1604 err, 

1605 f"backend={unsound_backend!r} claimed to verify this test passes - " 

1606 "please send them a bug report!", 

1607 ) 

1608 

1609 raise the_error_hypothesis_found 

1610 

1611 

1612@contextlib.contextmanager 

1613def fake_subTest(self, msg=None, **__): 

1614 """Monkeypatch for `unittest.TestCase.subTest` during `@given`. 

1615 

1616 If we don't patch this out, each failing example is reported as a 

1617 separate failing test by the unittest test runner, which is 

1618 obviously incorrect. We therefore replace it for the duration with 

1619 this version. 

1620 """ 

1621 warnings.warn( 

1622 "subTest per-example reporting interacts badly with Hypothesis " 

1623 "trying hundreds of examples, so we disable it for the duration of " 

1624 "any test that uses `@given`.", 

1625 HypothesisWarning, 

1626 stacklevel=2, 

1627 ) 

1628 yield 

1629 

1630 

1631@dataclass 

1632class HypothesisHandle: 

1633 """This object is provided as the .hypothesis attribute on @given tests. 

1634 

1635 Downstream users can reassign its attributes to insert custom logic into 

1636 the execution of each case, for example by converting an async into a 

1637 sync function. 

1638 

1639 This must be an attribute of an attribute, because reassignment of a 

1640 first-level attribute would not be visible to Hypothesis if the function 

1641 had been decorated before the assignment. 

1642 

1643 See https://github.com/HypothesisWorks/hypothesis/issues/1257 for more 

1644 information. 

1645 """ 

1646 

1647 inner_test: Any 

1648 _get_fuzz_target: Any 

1649 _given_kwargs: Any 

1650 

1651 @property 

1652 def fuzz_one_input( 

1653 self, 

1654 ) -> Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]]: 

1655 """Run the test as a fuzz target, driven with the `buffer` of bytes. 

1656 

1657 Returns None if buffer invalid for the strategy, canonical pruned 

1658 bytes if the buffer was valid, and leaves raised exceptions alone. 

1659 """ 

1660 # Note: most users, if they care about fuzzer performance, will access the 

1661 # property and assign it to a local variable to move the attribute lookup 

1662 # outside their fuzzing loop / before the fork point. We cache it anyway, 

1663 # so that naive or unusual use-cases get the best possible performance too. 

1664 try: 

1665 return self.__cached_target # type: ignore 

1666 except AttributeError: 

1667 self.__cached_target = self._get_fuzz_target() 

1668 return self.__cached_target 

1669 

1670 

1671@overload 

1672def given( 

1673 _: EllipsisType, / 

1674) -> Callable[ 

1675 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[[], None] 

1676]: # pragma: no cover 

1677 ... 

1678 

1679 

1680@overload 

1681def given( 

1682 *_given_arguments: SearchStrategy[Any], 

1683) -> Callable[ 

1684 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None] 

1685]: # pragma: no cover 

1686 ... 

1687 

1688 

1689@overload 

1690def given( 

1691 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType], 

1692) -> Callable[ 

1693 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None] 

1694]: # pragma: no cover 

1695 ... 

1696 

1697 

1698def given( 

1699 *_given_arguments: Union[SearchStrategy[Any], EllipsisType], 

1700 **_given_kwargs: Union[SearchStrategy[Any], EllipsisType], 

1701) -> Callable[ 

1702 [Callable[..., Optional[Coroutine[Any, Any, None]]]], Callable[..., None] 

1703]: 

1704 """ 

1705 The |@given| decorator turns a function into a Hypothesis test. This is the 

1706 main entry point to Hypothesis. 

1707 

1708 .. seealso:: 

1709 

1710 See also the :doc:`/tutorial/introduction` tutorial, which introduces 

1711 defining Hypothesis tests with |@given|. 

1712 

1713 .. _given-arguments: 

1714 

1715 Arguments to ``@given`` 

1716 ----------------------- 

1717 

1718 Arguments to |@given| may be either positional or keyword arguments: 

1719 

1720 .. code-block:: python 

1721 

1722 @given(st.integers(), st.floats()) 

1723 def test_one(x, y): 

1724 pass 

1725 

1726 @given(x=st.integers(), y=st.floats()) 

1727 def test_two(x, y): 

1728 pass 

1729 

1730 If using keyword arguments, the arguments may appear in any order, as with 

1731 standard Python functions: 

1732 

1733 .. code-block:: python 

1734 

1735 # different order, but still equivalent to before 

1736 @given(y=st.floats(), x=st.integers()) 

1737 def test(x, y): 

1738 assert isinstance(x, int) 

1739 assert isinstance(y, float) 

1740 

1741 If |@given| is provided fewer positional arguments than the decorated test, 

1742 the test arguments are filled in on the right side, leaving the leftmost 

1743 positional arguments unfilled: 

1744 

1745 .. code-block:: python 

1746 

1747 @given(st.integers(), st.floats()) 

1748 def test(manual_string, y, z): 

1749 assert manual_string == "x" 

1750 assert isinstance(y, int) 

1751 assert isinstance(z, float) 

1752 

1753 # `test` is now a callable which takes one argument `manual_string` 

1754 

1755 test("x") 

1756 # or equivalently: 

1757 test(manual_string="x") 

1758 

1759 The reason for this "from the right" behavior is to support using |@given| 

1760 with instance methods, by passing through ``self``: 

1761 

1762 .. code-block:: python 

1763 

1764 class MyTest(TestCase): 

1765 @given(st.integers()) 

1766 def test(self, x): 

1767 assert isinstance(self, MyTest) 

1768 assert isinstance(x, int) 

1769 

1770 If (and only if) using keyword arguments, |@given| may be combined with 

1771 ``**kwargs`` or ``*args``: 

1772 

1773 .. code-block:: python 

1774 

1775 @given(x=integers(), y=integers()) 

1776 def test(x, **kwargs): 

1777 assert "y" in kwargs 

1778 

1779 @given(x=integers(), y=integers()) 

1780 def test(x, *args, **kwargs): 

1781 assert args == () 

1782 assert "x" not in kwargs 

1783 assert "y" in kwargs 

1784 

1785 It is an error to: 

1786 

1787 * Mix positional and keyword arguments to |@given|. 

1788 * Use |@given| with a function that has a default value for an argument. 

1789 * Use |@given| with positional arguments with a function that uses ``*args``, 

1790 ``**kwargs``, or keyword-only arguments. 

1791 

1792 The function returned by given has all the same arguments as the original 

1793 test, minus those that are filled in by |@given|. See the :ref:`notes on 

1794 framework compatibility <framework-compatibility>` for how this interacts 

1795 with features of other testing libraries, such as :pypi:`pytest` fixtures. 

1796 """ 

1797 

1798 if currently_in_test_context(): 

1799 fail_health_check( 

1800 Settings(), 

1801 "Nesting @given tests results in quadratic generation and shrinking " 

1802 "behavior, and can usually be more cleanly expressed by replacing the " 

1803 "inner function with an st.data() parameter on the outer @given." 

1804 "\n\n" 

1805 "If it is difficult or impossible to refactor this test to remove the " 

1806 "nested @given, you can disable this health check with " 

1807 "@settings(suppress_health_check=[HealthCheck.nested_given]) on the " 

1808 "outer @given. See " 

1809 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

1810 "for details.", 

1811 HealthCheck.nested_given, 

1812 ) 

1813 

1814 def run_test_as_given(test): 

1815 if inspect.isclass(test): 

1816 # Provide a meaningful error to users, instead of exceptions from 

1817 # internals that assume we're dealing with a function. 

1818 raise InvalidArgument("@given cannot be applied to a class") 

1819 

1820 if ( 

1821 "_pytest" in sys.modules 

1822 and "_pytest.fixtures" in sys.modules 

1823 and ( 

1824 tuple(map(int, sys.modules["_pytest"].__version__.split(".")[:2])) 

1825 >= (8, 4) 

1826 ) 

1827 and isinstance( 

1828 test, sys.modules["_pytest.fixtures"].FixtureFunctionDefinition 

1829 ) 

1830 ): # pragma: no cover # covered by pytest/test_fixtures, but not by cover/ 

1831 raise InvalidArgument("@given cannot be applied to a pytest fixture") 

1832 

1833 given_arguments = tuple(_given_arguments) 

1834 given_kwargs = dict(_given_kwargs) 

1835 

1836 original_sig = get_signature(test) 

1837 if given_arguments == (Ellipsis,) and not given_kwargs: 

1838 # user indicated that they want to infer all arguments 

1839 given_kwargs = { 

1840 p.name: Ellipsis 

1841 for p in original_sig.parameters.values() 

1842 if p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY) 

1843 } 

1844 given_arguments = () 

1845 

1846 check_invalid = is_invalid_test( 

1847 test, original_sig, given_arguments, given_kwargs 

1848 ) 

1849 

1850 # If the argument check found problems, return a dummy test function 

1851 # that will raise an error if it is actually called. 

1852 if check_invalid is not None: 

1853 return check_invalid 

1854 

1855 # Because the argument check succeeded, we can convert @given's 

1856 # positional arguments into keyword arguments for simplicity. 

1857 if given_arguments: 

1858 assert not given_kwargs 

1859 posargs = [ 

1860 p.name 

1861 for p in original_sig.parameters.values() 

1862 if p.kind is p.POSITIONAL_OR_KEYWORD 

1863 ] 

1864 given_kwargs = dict(list(zip(posargs[::-1], given_arguments[::-1]))[::-1]) 

1865 # These have been converted, so delete them to prevent accidental use. 

1866 del given_arguments 

1867 

1868 new_signature = new_given_signature(original_sig, given_kwargs) 

1869 

1870 # Use type information to convert "infer" arguments into appropriate strategies. 

1871 if ... in given_kwargs.values(): 

1872 hints = get_type_hints(test) 

1873 for name in [name for name, value in given_kwargs.items() if value is ...]: 

1874 if name not in hints: 

1875 return _invalid( 

1876 f"passed {name}=... for {test.__name__}, but {name} has " 

1877 "no type annotation", 

1878 test=test, 

1879 given_kwargs=given_kwargs, 

1880 ) 

1881 given_kwargs[name] = st.from_type(hints[name]) 

1882 

1883 # only raise if the same thread uses two different executors, not if two 

1884 # different threads use different executors. 

1885 thread_local = ThreadLocal(prev_self=lambda: not_set) 

1886 # maps thread_id to whether that thread overlaps in execution with any 

1887 # other thread in this @given. We use this to detect whether an @given is 

1888 # being run from multiple different threads at once, which informs 

1889 # decisions like whether to raise DeadlineExceeded or HealthCheck.too_slow. 

1890 thread_overlap: dict[int, bool] = {} 

1891 thread_overlap_lock = Lock() 

1892 

1893 @impersonate(test) 

1894 @define_function_signature(test.__name__, test.__doc__, new_signature) 

1895 def wrapped_test(*arguments, **kwargs): 

1896 # Tell pytest to omit the body of this function from tracebacks 

1897 __tracebackhide__ = True 

1898 with thread_overlap_lock: 

1899 for overlap_thread_id in thread_overlap: 

1900 thread_overlap[overlap_thread_id] = True 

1901 

1902 threadid = threading.get_ident() 

1903 # if there are existing threads when this thread starts, then 

1904 # this thread starts at an overlapped state. 

1905 has_existing_threads = len(thread_overlap) > 0 

1906 thread_overlap[threadid] = has_existing_threads 

1907 

1908 try: 

1909 test = wrapped_test.hypothesis.inner_test 

1910 if getattr(test, "is_hypothesis_test", False): 

1911 raise InvalidArgument( 

1912 f"You have applied @given to the test {test.__name__} more than " 

1913 "once, which wraps the test several times and is extremely slow. " 

1914 "A similar effect can be gained by combining the arguments " 

1915 "of the two calls to given. For example, instead of " 

1916 "@given(booleans()) @given(integers()), you could write " 

1917 "@given(booleans(), integers())" 

1918 ) 

1919 

1920 settings = wrapped_test._hypothesis_internal_use_settings 

1921 random = get_random_for_wrapped_test(test, wrapped_test) 

1922 arguments, kwargs, stuff = process_arguments_to_given( 

1923 wrapped_test, 

1924 arguments, 

1925 kwargs, 

1926 given_kwargs, 

1927 new_signature.parameters, 

1928 ) 

1929 

1930 if ( 

1931 inspect.iscoroutinefunction(test) 

1932 and get_executor(stuff.selfy) is default_executor 

1933 ): 

1934 # See https://github.com/HypothesisWorks/hypothesis/issues/3054 

1935 # If our custom executor doesn't handle coroutines, or we return an 

1936 # awaitable from a non-async-def function, we just rely on the 

1937 # return_value health check. This catches most user errors though. 

1938 raise InvalidArgument( 

1939 "Hypothesis doesn't know how to run async test functions like " 

1940 f"{test.__name__}. You'll need to write a custom executor, " 

1941 "or use a library like pytest-asyncio or pytest-trio which can " 

1942 "handle the translation for you.\n See https://hypothesis." 

1943 "readthedocs.io/en/latest/details.html#custom-function-execution" 

1944 ) 

1945 

1946 runner = stuff.selfy 

1947 if isinstance(stuff.selfy, TestCase) and test.__name__ in dir(TestCase): 

1948 fail_health_check( 

1949 settings, 

1950 f"You have applied @given to the method {test.__name__}, which is " 

1951 "used by the unittest runner but is not itself a test. " 

1952 "This is not useful in any way.", 

1953 HealthCheck.not_a_test_method, 

1954 ) 

1955 if bad_django_TestCase(runner): # pragma: no cover 

1956 # Covered by the Django tests, but not the pytest coverage task 

1957 raise InvalidArgument( 

1958 "You have applied @given to a method on " 

1959 f"{type(runner).__qualname__}, but this " 

1960 "class does not inherit from the supported versions in " 

1961 "`hypothesis.extra.django`. Use the Hypothesis variants " 

1962 "to ensure that each example is run in a separate " 

1963 "database transaction." 

1964 ) 

1965 

1966 nonlocal thread_local 

1967 # Check selfy really is self (not e.g. a mock) before we health-check 

1968 cur_self = ( 

1969 stuff.selfy 

1970 if getattr(type(stuff.selfy), test.__name__, None) is wrapped_test 

1971 else None 

1972 ) 

1973 if thread_local.prev_self is not_set: 

1974 thread_local.prev_self = cur_self 

1975 elif cur_self is not thread_local.prev_self: 

1976 fail_health_check( 

1977 settings, 

1978 f"The method {test.__qualname__} was called from multiple " 

1979 "different executors. This may lead to flaky tests and " 

1980 "nonreproducible errors when replaying from database." 

1981 "\n\n" 

1982 "Unlike most health checks, HealthCheck.differing_executors " 

1983 "warns about a correctness issue with your test. We " 

1984 "therefore recommend fixing the underlying issue, rather " 

1985 "than suppressing this health check. However, if you are " 

1986 "confident this health check can be safely disabled, you can " 

1987 "do so with " 

1988 "@settings(suppress_health_check=[HealthCheck.differing_executors]). " 

1989 "See " 

1990 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

1991 "for details.", 

1992 HealthCheck.differing_executors, 

1993 ) 

1994 

1995 state = StateForActualGivenExecution( 

1996 stuff, 

1997 test, 

1998 settings, 

1999 random, 

2000 wrapped_test, 

2001 thread_overlap=thread_overlap, 

2002 ) 

2003 

2004 # If there was a @reproduce_failure decorator, use it to reproduce 

2005 # the error (or complain that we couldn't). Either way, this will 

2006 # always raise some kind of error. 

2007 if ( 

2008 reproduce_failure := wrapped_test._hypothesis_internal_use_reproduce_failure 

2009 ) is not None: 

2010 expected_version, failure = reproduce_failure 

2011 if expected_version != __version__: 

2012 raise InvalidArgument( 

2013 "Attempting to reproduce a failure from a different " 

2014 f"version of Hypothesis. This failure is from {expected_version}, but " 

2015 f"you are currently running {__version__!r}. Please change your " 

2016 "Hypothesis version to a matching one." 

2017 ) 

2018 try: 

2019 state.execute_once( 

2020 ConjectureData.for_choices(decode_failure(failure)), 

2021 print_example=True, 

2022 is_final=True, 

2023 ) 

2024 raise DidNotReproduce( 

2025 "Expected the test to raise an error, but it " 

2026 "completed successfully." 

2027 ) 

2028 except StopTest: 

2029 raise DidNotReproduce( 

2030 "The shape of the test data has changed in some way " 

2031 "from where this blob was defined. Are you sure " 

2032 "you're running the same test?" 

2033 ) from None 

2034 except UnsatisfiedAssumption: 

2035 raise DidNotReproduce( 

2036 "The test data failed to satisfy an assumption in the " 

2037 "test. Have you added it since this blob was generated?" 

2038 ) from None 

2039 

2040 # There was no @reproduce_failure, so start by running any explicit 

2041 # examples from @example decorators. 

2042 if errors := list( 

2043 execute_explicit_examples( 

2044 state, wrapped_test, arguments, kwargs, original_sig 

2045 ) 

2046 ): 

2047 # If we're not going to report multiple bugs, we would have 

2048 # stopped running explicit examples at the first failure. 

2049 assert len(errors) == 1 or state.settings.report_multiple_bugs 

2050 

2051 # If an explicit example raised a 'skip' exception, ensure it's never 

2052 # wrapped up in an exception group. Because we break out of the loop 

2053 # immediately on finding a skip, if present it's always the last error. 

2054 if isinstance(errors[-1][1], skip_exceptions_to_reraise()): 

2055 # Covered by `test_issue_3453_regression`, just in a subprocess. 

2056 del errors[:-1] # pragma: no cover 

2057 

2058 _raise_to_user(errors, state.settings, [], " in explicit examples") 

2059 

2060 # If there were any explicit examples, they all ran successfully. 

2061 # The next step is to use the Conjecture engine to run the test on 

2062 # many different inputs. 

2063 ran_explicit_examples = ( 

2064 Phase.explicit in state.settings.phases 

2065 and getattr(wrapped_test, "hypothesis_explicit_examples", ()) 

2066 ) 

2067 SKIP_BECAUSE_NO_EXAMPLES = unittest.SkipTest( 

2068 "Hypothesis has been told to run no examples for this test." 

2069 ) 

2070 if not ( 

2071 Phase.reuse in settings.phases or Phase.generate in settings.phases 

2072 ): 

2073 if not ran_explicit_examples: 

2074 raise SKIP_BECAUSE_NO_EXAMPLES 

2075 return 

2076 

2077 try: 

2078 if isinstance(runner, TestCase) and hasattr(runner, "subTest"): 

2079 subTest = runner.subTest 

2080 try: 

2081 runner.subTest = types.MethodType(fake_subTest, runner) 

2082 state.run_engine() 

2083 finally: 

2084 runner.subTest = subTest 

2085 else: 

2086 state.run_engine() 

2087 except BaseException as e: 

2088 # The exception caught here should either be an actual test 

2089 # failure (or BaseExceptionGroup), or some kind of fatal error 

2090 # that caused the engine to stop. 

2091 generated_seed = ( 

2092 wrapped_test._hypothesis_internal_use_generated_seed 

2093 ) 

2094 with local_settings(settings): 

2095 if not (state.failed_normally or generated_seed is None): 

2096 if running_under_pytest: 

2097 report( 

2098 f"You can add @seed({generated_seed}) to this test or " 

2099 f"run pytest with --hypothesis-seed={generated_seed} " 

2100 "to reproduce this failure." 

2101 ) 

2102 else: 

2103 report( 

2104 f"You can add @seed({generated_seed}) to this test to " 

2105 "reproduce this failure." 

2106 ) 

2107 # The dance here is to avoid showing users long tracebacks 

2108 # full of Hypothesis internals they don't care about. 

2109 # We have to do this inline, to avoid adding another 

2110 # internal stack frame just when we've removed the rest. 

2111 # 

2112 # Using a variable for our trimmed error ensures that the line 

2113 # which will actually appear in tracebacks is as clear as 

2114 # possible - "raise the_error_hypothesis_found". 

2115 the_error_hypothesis_found = e.with_traceback( 

2116 None 

2117 if isinstance(e, BaseExceptionGroup) 

2118 else get_trimmed_traceback() 

2119 ) 

2120 raise the_error_hypothesis_found 

2121 

2122 if not (ran_explicit_examples or state.ever_executed): 

2123 raise SKIP_BECAUSE_NO_EXAMPLES 

2124 finally: 

2125 with thread_overlap_lock: 

2126 del thread_overlap[threadid] 

2127 

2128 def _get_fuzz_target() -> ( 

2129 Callable[[Union[bytes, bytearray, memoryview, BinaryIO]], Optional[bytes]] 

2130 ): 

2131 # Because fuzzing interfaces are very performance-sensitive, we use a 

2132 # somewhat more complicated structure here. `_get_fuzz_target()` is 

2133 # called by the `HypothesisHandle.fuzz_one_input` property, allowing 

2134 # us to defer our collection of the settings, random instance, and 

2135 # reassignable `inner_test` (etc) until `fuzz_one_input` is accessed. 

2136 # 

2137 # We then share the performance cost of setting up `state` between 

2138 # many invocations of the target. We explicitly force `deadline=None` 

2139 # for performance reasons, saving ~40% the runtime of an empty test. 

2140 test = wrapped_test.hypothesis.inner_test 

2141 settings = Settings( 

2142 parent=wrapped_test._hypothesis_internal_use_settings, deadline=None 

2143 ) 

2144 random = get_random_for_wrapped_test(test, wrapped_test) 

2145 _args, _kwargs, stuff = process_arguments_to_given( 

2146 wrapped_test, (), {}, given_kwargs, new_signature.parameters 

2147 ) 

2148 assert not _args 

2149 assert not _kwargs 

2150 state = StateForActualGivenExecution( 

2151 stuff, 

2152 test, 

2153 settings, 

2154 random, 

2155 wrapped_test, 

2156 thread_overlap=thread_overlap, 

2157 ) 

2158 database_key = function_digest(test) + b".secondary" 

2159 # We track the minimal-so-far example for each distinct origin, so 

2160 # that we track log-n instead of n examples for long runs. In particular 

2161 # it means that we saturate for common errors in long runs instead of 

2162 # storing huge volumes of low-value data. 

2163 minimal_failures: dict = {} 

2164 

2165 def fuzz_one_input( 

2166 buffer: Union[bytes, bytearray, memoryview, BinaryIO], 

2167 ) -> Optional[bytes]: 

2168 # This inner part is all that the fuzzer will actually run, 

2169 # so we keep it as small and as fast as possible. 

2170 if isinstance(buffer, io.IOBase): 

2171 buffer = buffer.read(BUFFER_SIZE) 

2172 assert isinstance(buffer, (bytes, bytearray, memoryview)) 

2173 data = ConjectureData( 

2174 random=None, 

2175 provider=BytestringProvider, 

2176 provider_kw={"bytestring": buffer}, 

2177 ) 

2178 try: 

2179 state.execute_once(data) 

2180 status = Status.VALID 

2181 except StopTest: 

2182 status = data.status 

2183 return None 

2184 except UnsatisfiedAssumption: 

2185 status = Status.INVALID 

2186 return None 

2187 except BaseException: 

2188 known = minimal_failures.get(data.interesting_origin) 

2189 if settings.database is not None and ( 

2190 known is None or sort_key(data.nodes) <= sort_key(known) 

2191 ): 

2192 settings.database.save( 

2193 database_key, choices_to_bytes(data.choices) 

2194 ) 

2195 minimal_failures[data.interesting_origin] = data.nodes 

2196 status = Status.INTERESTING 

2197 raise 

2198 finally: 

2199 if observability_enabled(): 

2200 data.freeze() 

2201 tc = make_testcase( 

2202 run_start=state._start_timestamp, 

2203 property=state.test_identifier, 

2204 data=data, 

2205 how_generated="fuzz_one_input", 

2206 representation=state._string_repr, 

2207 arguments=data._observability_args, 

2208 timing=state._timing_features, 

2209 coverage=None, 

2210 status=status, 

2211 backend_metadata=data.provider.observe_test_case(), 

2212 ) 

2213 deliver_observation(tc) 

2214 state._timing_features = {} 

2215 

2216 assert isinstance(data.provider, BytestringProvider) 

2217 return bytes(data.provider.drawn) 

2218 

2219 fuzz_one_input.__doc__ = HypothesisHandle.fuzz_one_input.__doc__ 

2220 return fuzz_one_input 

2221 

2222 # After having created the decorated test function, we need to copy 

2223 # over some attributes to make the switch as seamless as possible. 

2224 

2225 for attrib in dir(test): 

2226 if not (attrib.startswith("_") or hasattr(wrapped_test, attrib)): 

2227 setattr(wrapped_test, attrib, getattr(test, attrib)) 

2228 wrapped_test.is_hypothesis_test = True 

2229 if hasattr(test, "_hypothesis_internal_settings_applied"): 

2230 # Used to check if @settings is applied twice. 

2231 wrapped_test._hypothesis_internal_settings_applied = True 

2232 wrapped_test._hypothesis_internal_use_seed = getattr( 

2233 test, "_hypothesis_internal_use_seed", None 

2234 ) 

2235 wrapped_test._hypothesis_internal_use_settings = ( 

2236 getattr(test, "_hypothesis_internal_use_settings", None) or Settings.default 

2237 ) 

2238 wrapped_test._hypothesis_internal_use_reproduce_failure = getattr( 

2239 test, "_hypothesis_internal_use_reproduce_failure", None 

2240 ) 

2241 wrapped_test.hypothesis = HypothesisHandle(test, _get_fuzz_target, given_kwargs) 

2242 return wrapped_test 

2243 

2244 return run_test_as_given 

2245 

2246 

2247def find( 

2248 specifier: SearchStrategy[Ex], 

2249 condition: Callable[[Any], bool], 

2250 *, 

2251 settings: Optional[Settings] = None, 

2252 random: Optional[Random] = None, 

2253 database_key: Optional[bytes] = None, 

2254) -> Ex: 

2255 """Returns the minimal example from the given strategy ``specifier`` that 

2256 matches the predicate function ``condition``.""" 

2257 if settings is None: 

2258 settings = Settings(max_examples=2000) 

2259 settings = Settings( 

2260 settings, suppress_health_check=list(HealthCheck), report_multiple_bugs=False 

2261 ) 

2262 

2263 if database_key is None and settings.database is not None: 

2264 # Note: The database key is not guaranteed to be unique. If not, replaying 

2265 # of database examples may fail to reproduce due to being replayed on the 

2266 # wrong condition. 

2267 database_key = function_digest(condition) 

2268 

2269 if not isinstance(specifier, SearchStrategy): 

2270 raise InvalidArgument( 

2271 f"Expected SearchStrategy but got {specifier!r} of " 

2272 f"type {type(specifier).__name__}" 

2273 ) 

2274 specifier.validate() 

2275 

2276 last: list[Ex] = [] 

2277 

2278 @settings 

2279 @given(specifier) 

2280 def test(v): 

2281 if condition(v): 

2282 last[:] = [v] 

2283 raise Found 

2284 

2285 if random is not None: 

2286 test = seed(random.getrandbits(64))(test) 

2287 

2288 # Aliasing as Any avoids mypy errors (attr-defined) when accessing and 

2289 # setting custom attributes on the decorated function or class. 

2290 _test: Any = test 

2291 _test._hypothesis_internal_is_find = True 

2292 _test._hypothesis_internal_database_key = database_key 

2293 

2294 try: 

2295 test() 

2296 except Found: 

2297 return last[0] 

2298 

2299 raise NoSuchExample(get_pretty_function_description(condition))