Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/core.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

795 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""This module provides the core primitives of Hypothesis, such as given.""" 

12import base64 

13import contextlib 

14import dataclasses 

15import datetime 

16import inspect 

17import io 

18import math 

19import os 

20import sys 

21import threading 

22import time 

23import traceback 

24import types 

25import unittest 

26import warnings 

27import zlib 

28from collections import defaultdict 

29from collections.abc import Callable, Coroutine, Generator, Hashable, Iterable, Sequence 

30from dataclasses import dataclass, field 

31from functools import partial 

32from inspect import Parameter 

33from random import Random 

34from threading import Lock 

35from types import EllipsisType 

36from typing import ( 

37 Any, 

38 BinaryIO, 

39 TypeVar, 

40 overload, 

41) 

42from unittest import TestCase 

43 

44from hypothesis import strategies as st 

45from hypothesis._settings import ( 

46 HealthCheck, 

47 Phase, 

48 Verbosity, 

49 all_settings, 

50 local_settings, 

51 settings as Settings, 

52) 

53from hypothesis.control import BuildContext, currently_in_test_context 

54from hypothesis.database import choices_from_bytes, choices_to_bytes 

55from hypothesis.errors import ( 

56 BackendCannotProceed, 

57 DeadlineExceeded, 

58 DidNotReproduce, 

59 FailedHealthCheck, 

60 FlakyFailure, 

61 FlakyReplay, 

62 Found, 

63 Frozen, 

64 HypothesisException, 

65 HypothesisWarning, 

66 InvalidArgument, 

67 NoSuchExample, 

68 StopTest, 

69 Unsatisfiable, 

70 UnsatisfiedAssumption, 

71) 

72from hypothesis.internal import observability 

73from hypothesis.internal.compat import ( 

74 PYPY, 

75 BaseExceptionGroup, 

76 add_note, 

77 bad_django_TestCase, 

78 get_type_hints, 

79 int_from_bytes, 

80) 

81from hypothesis.internal.conjecture.choice import ChoiceT 

82from hypothesis.internal.conjecture.data import ConjectureData, Status 

83from hypothesis.internal.conjecture.engine import BUFFER_SIZE, ConjectureRunner 

84from hypothesis.internal.conjecture.junkdrawer import ( 

85 ensure_free_stackframes, 

86 gc_cumulative_time, 

87) 

88from hypothesis.internal.conjecture.providers import ( 

89 BytestringProvider, 

90 PrimitiveProvider, 

91) 

92from hypothesis.internal.conjecture.shrinker import sort_key 

93from hypothesis.internal.entropy import deterministic_PRNG 

94from hypothesis.internal.escalation import ( 

95 InterestingOrigin, 

96 current_pytest_item, 

97 format_exception, 

98 get_trimmed_traceback, 

99 is_hypothesis_file, 

100) 

101from hypothesis.internal.healthcheck import fail_health_check 

102from hypothesis.internal.observability import ( 

103 InfoObservation, 

104 InfoObservationType, 

105 deliver_observation, 

106 make_testcase, 

107 observability_enabled, 

108) 

109from hypothesis.internal.reflection import ( 

110 convert_positional_arguments, 

111 define_function_signature, 

112 function_digest, 

113 get_pretty_function_description, 

114 get_signature, 

115 impersonate, 

116 is_mock, 

117 nicerepr, 

118 proxies, 

119 repr_call, 

120) 

121from hypothesis.internal.scrutineer import ( 

122 MONITORING_TOOL_ID, 

123 Trace, 

124 Tracer, 

125 explanatory_lines, 

126 tractable_coverage_report, 

127) 

128from hypothesis.internal.validation import check_type 

129from hypothesis.reporting import ( 

130 current_verbosity, 

131 report, 

132 verbose_report, 

133 with_reporter, 

134) 

135from hypothesis.statistics import describe_statistics, describe_targets, note_statistics 

136from hypothesis.strategies._internal.misc import NOTHING 

137from hypothesis.strategies._internal.strategies import ( 

138 Ex, 

139 SearchStrategy, 

140 check_strategy, 

141) 

142from hypothesis.utils.conventions import not_set 

143from hypothesis.utils.threading import ThreadLocal 

144from hypothesis.vendor.pretty import RepresentationPrinter 

145from hypothesis.version import __version__ 

146 

147TestFunc = TypeVar("TestFunc", bound=Callable) 

148 

149 

150running_under_pytest = False 

151pytest_shows_exceptiongroups = True 

152global_force_seed = None 

153# `threadlocal` stores "engine-global" constants, which are global relative to a 

154# ConjectureRunner instance (roughly speaking). Since only one conjecture runner 

155# instance can be active per thread, making engine constants thread-local prevents 

156# the ConjectureRunner instances of concurrent threads from treading on each other. 

157threadlocal = ThreadLocal(_hypothesis_global_random=lambda: None) 

158 

159 

160@dataclass(slots=True, frozen=False) 

161class Example: 

162 args: Any 

163 kwargs: Any 

164 # Plus two optional arguments for .xfail() 

165 raises: Any = field(default=None) 

166 reason: Any = field(default=None) 

167 

168 

169# TODO_DOCS link to not-yet-existent patch-dumping docs 

170 

171 

172class example: 

173 """ 

174 Add an explicit input to a Hypothesis test, which Hypothesis will always 

175 try before generating random inputs. This combines the randomized nature of 

176 Hypothesis generation with a traditional parametrized test. 

177 

178 For example: 

179 

180 .. code-block:: python 

181 

182 @example("Hello world") 

183 @example("some string with special significance") 

184 @given(st.text()) 

185 def test_strings(s): 

186 pass 

187 

188 will call ``test_strings("Hello World")`` and 

189 ``test_strings("some string with special significance")`` before generating 

190 any random inputs. |@example| may be placed in any order relative to |@given| 

191 and |@settings|. 

192 

193 Explicit inputs from |@example| are run in the |Phase.explicit| phase. 

194 Explicit inputs do not count towards |settings.max_examples|. Note that 

195 explicit inputs added by |@example| do not shrink. If an explicit input 

196 fails, Hypothesis will stop and report the failure without generating any 

197 random inputs. 

198 

199 |@example| can also be used to easily reproduce a failure. For instance, if 

200 Hypothesis reports that ``f(n=[0, math.nan])`` fails, you can add 

201 ``@example(n=[0, math.nan])`` to your test to quickly reproduce that failure. 

202 

203 Arguments to ``@example`` 

204 ------------------------- 

205 

206 Arguments to |@example| have the same behavior and restrictions as arguments 

207 to |@given|. This means they may be either positional or keyword arguments 

208 (but not both in the same |@example|): 

209 

210 .. code-block:: python 

211 

212 @example(1, 2) 

213 @example(x=1, y=2) 

214 @given(st.integers(), st.integers()) 

215 def test(x, y): 

216 pass 

217 

218 Noting that while arguments to |@given| are strategies (like |st.integers|), 

219 arguments to |@example| are values instead (like ``1``). 

220 

221 See the :ref:`given-arguments` section for full details. 

222 """ 

223 

224 def __init__(self, *args: Any, **kwargs: Any) -> None: 

225 if args and kwargs: 

226 raise InvalidArgument( 

227 "Cannot mix positional and keyword arguments for examples" 

228 ) 

229 if not (args or kwargs): 

230 raise InvalidArgument("An example must provide at least one argument") 

231 

232 self.hypothesis_explicit_examples: list[Example] = [] 

233 self._this_example = Example(tuple(args), kwargs) 

234 

235 def __call__(self, test: TestFunc) -> TestFunc: 

236 if not hasattr(test, "hypothesis_explicit_examples"): 

237 test.hypothesis_explicit_examples = self.hypothesis_explicit_examples # type: ignore 

238 test.hypothesis_explicit_examples.append(self._this_example) # type: ignore 

239 return test 

240 

241 def xfail( 

242 self, 

243 condition: bool = True, # noqa: FBT002 

244 *, 

245 reason: str = "", 

246 raises: type[BaseException] | tuple[type[BaseException], ...] = BaseException, 

247 ) -> "example": 

248 """Mark this example as an expected failure, similarly to 

249 :obj:`pytest.mark.xfail(strict=True) <pytest.mark.xfail>`. 

250 

251 Expected-failing examples allow you to check that your test does fail on 

252 some examples, and therefore build confidence that *passing* tests are 

253 because your code is working, not because the test is missing something. 

254 

255 .. code-block:: python 

256 

257 @example(...).xfail() 

258 @example(...).xfail(reason="Prices must be non-negative") 

259 @example(...).xfail(raises=(KeyError, ValueError)) 

260 @example(...).xfail(sys.version_info[:2] >= (3, 12), reason="needs py 3.12") 

261 @example(...).xfail(condition=sys.platform != "linux", raises=OSError) 

262 def test(x): 

263 pass 

264 

265 .. note:: 

266 

267 Expected-failing examples are handled separately from those generated 

268 by strategies, so you should usually ensure that there is no overlap. 

269 

270 .. code-block:: python 

271 

272 @example(x=1, y=0).xfail(raises=ZeroDivisionError) 

273 @given(x=st.just(1), y=st.integers()) # Missing `.filter(bool)`! 

274 def test_fraction(x, y): 

275 # This test will try the explicit example and see it fail as 

276 # expected, then go on to generate more examples from the 

277 # strategy. If we happen to generate y=0, the test will fail 

278 # because only the explicit example is treated as xfailing. 

279 x / y 

280 """ 

281 check_type(bool, condition, "condition") 

282 check_type(str, reason, "reason") 

283 if not ( 

284 isinstance(raises, type) and issubclass(raises, BaseException) 

285 ) and not ( 

286 isinstance(raises, tuple) 

287 and raises # () -> expected to fail with no error, which is impossible 

288 and all( 

289 isinstance(r, type) and issubclass(r, BaseException) for r in raises 

290 ) 

291 ): 

292 raise InvalidArgument( 

293 f"{raises=} must be an exception type or tuple of exception types" 

294 ) 

295 if condition: 

296 self._this_example = dataclasses.replace( 

297 self._this_example, raises=raises, reason=reason 

298 ) 

299 return self 

300 

301 def via(self, whence: str, /) -> "example": 

302 """Attach a machine-readable label noting what the origin of this example 

303 was. |example.via| is completely optional and does not change runtime 

304 behavior. 

305 

306 |example.via| is intended to support self-documenting behavior, as well as 

307 tooling which might add (or remove) |@example| decorators automatically. 

308 For example: 

309 

310 .. code-block:: python 

311 

312 # Annotating examples is optional and does not change runtime behavior 

313 @example(...) 

314 @example(...).via("regression test for issue #42") 

315 @example(...).via("discovered failure") 

316 def test(x): 

317 pass 

318 

319 .. note:: 

320 

321 `HypoFuzz <https://hypofuzz.com/>`_ uses |example.via| to tag examples 

322 in the patch of its high-coverage set of explicit inputs, on 

323 `the patches page <https://hypofuzz.com/example-dashboard/#/patches>`_. 

324 """ 

325 if not isinstance(whence, str): 

326 raise InvalidArgument(".via() must be passed a string") 

327 # This is deliberately a no-op at runtime; the tools operate on source code. 

328 return self 

329 

330 

331def seed(seed: Hashable) -> Callable[[TestFunc], TestFunc]: 

332 """ 

333 Seed the randomness for this test. 

334 

335 ``seed`` may be any hashable object. No exact meaning for ``seed`` is provided 

336 other than that for a fixed seed value Hypothesis will produce the same 

337 examples (assuming that there are no other sources of nondeterminisim, such 

338 as timing, hash randomization, or external state). 

339 

340 For example, the following test function and |RuleBasedStateMachine| will 

341 each generate the same series of examples each time they are executed: 

342 

343 .. code-block:: python 

344 

345 @seed(1234) 

346 @given(st.integers()) 

347 def test(n): ... 

348 

349 @seed(6789) 

350 class MyMachine(RuleBasedStateMachine): ... 

351 

352 If using pytest, you can alternatively pass ``--hypothesis-seed`` on the 

353 command line. 

354 

355 Setting a seed overrides |settings.derandomize|, which is designed to enable 

356 deterministic CI tests rather than reproducing observed failures. 

357 

358 Hypothesis will only print the seed which would reproduce a failure if a test 

359 fails in an unexpected way, for instance inside Hypothesis internals. 

360 """ 

361 

362 def accept(test): 

363 test._hypothesis_internal_use_seed = seed 

364 current_settings = getattr(test, "_hypothesis_internal_use_settings", None) 

365 test._hypothesis_internal_use_settings = Settings( 

366 current_settings, database=None 

367 ) 

368 return test 

369 

370 return accept 

371 

372 

373# TODO_DOCS: link to /explanation/choice-sequence 

374 

375 

376def reproduce_failure(version: str, blob: bytes) -> Callable[[TestFunc], TestFunc]: 

377 """ 

378 Run the example corresponding to the binary ``blob`` in order to reproduce a 

379 failure. ``blob`` is a serialized version of the internal input representation 

380 of Hypothesis. 

381 

382 A test decorated with |@reproduce_failure| always runs exactly one example, 

383 which is expected to cause a failure. If the provided ``blob`` does not 

384 cause a failure, Hypothesis will raise |DidNotReproduce|. 

385 

386 Hypothesis will print an |@reproduce_failure| decorator if 

387 |settings.print_blob| is ``True`` (which is the default in CI). 

388 

389 |@reproduce_failure| is intended to be temporarily added to your test suite in 

390 order to reproduce a failure. It is not intended to be a permanent addition to 

391 your test suite. Because of this, no compatibility guarantees are made across 

392 Hypothesis versions, and |@reproduce_failure| will error if used on a different 

393 Hypothesis version than it was created for. 

394 

395 .. seealso:: 

396 

397 See also the :doc:`/tutorial/replaying-failures` tutorial. 

398 """ 

399 

400 def accept(test): 

401 test._hypothesis_internal_use_reproduce_failure = (version, blob) 

402 return test 

403 

404 return accept 

405 

406 

407def reproduction_decorator(choices: Iterable[ChoiceT]) -> str: 

408 return f"@reproduce_failure({__version__!r}, {encode_failure(choices)!r})" 

409 

410 

411def encode_failure(choices: Iterable[ChoiceT]) -> bytes: 

412 blob = choices_to_bytes(choices) 

413 compressed = zlib.compress(blob) 

414 if len(compressed) < len(blob): 

415 blob = b"\1" + compressed 

416 else: 

417 blob = b"\0" + blob 

418 return base64.b64encode(blob) 

419 

420 

421def decode_failure(blob: bytes) -> Sequence[ChoiceT]: 

422 try: 

423 decoded = base64.b64decode(blob) 

424 except Exception: 

425 raise InvalidArgument(f"Invalid base64 encoded string: {blob!r}") from None 

426 

427 prefix = decoded[:1] 

428 if prefix == b"\0": 

429 decoded = decoded[1:] 

430 elif prefix == b"\1": 

431 try: 

432 decoded = zlib.decompress(decoded[1:]) 

433 except zlib.error as err: 

434 raise InvalidArgument( 

435 f"Invalid zlib compression for blob {blob!r}" 

436 ) from err 

437 else: 

438 raise InvalidArgument( 

439 f"Could not decode blob {blob!r}: Invalid start byte {prefix!r}" 

440 ) 

441 

442 choices = choices_from_bytes(decoded) 

443 if choices is None: 

444 raise InvalidArgument(f"Invalid serialized choice sequence for blob {blob!r}") 

445 

446 return choices 

447 

448 

449def _invalid(message, *, exc=InvalidArgument, test, given_kwargs): 

450 @impersonate(test) 

451 def wrapped_test(*arguments, **kwargs): # pragma: no cover # coverage limitation 

452 raise exc(message) 

453 

454 wrapped_test.is_hypothesis_test = True 

455 wrapped_test.hypothesis = HypothesisHandle( 

456 inner_test=test, 

457 _get_fuzz_target=wrapped_test, 

458 _given_kwargs=given_kwargs, 

459 ) 

460 return wrapped_test 

461 

462 

463def is_invalid_test(test, original_sig, given_arguments, given_kwargs): 

464 """Check the arguments to ``@given`` for basic usage constraints. 

465 

466 Most errors are not raised immediately; instead we return a dummy test 

467 function that will raise the appropriate error if it is actually called. 

468 When the user runs a subset of tests (e.g via ``pytest -k``), errors will 

469 only be reported for tests that actually ran. 

470 """ 

471 invalid = partial(_invalid, test=test, given_kwargs=given_kwargs) 

472 

473 if not (given_arguments or given_kwargs): 

474 return invalid("given must be called with at least one argument") 

475 

476 params = list(original_sig.parameters.values()) 

477 pos_params = [p for p in params if p.kind is p.POSITIONAL_OR_KEYWORD] 

478 kwonly_params = [p for p in params if p.kind is p.KEYWORD_ONLY] 

479 if given_arguments and params != pos_params: 

480 return invalid( 

481 "positional arguments to @given are not supported with varargs, " 

482 "varkeywords, positional-only, or keyword-only arguments" 

483 ) 

484 

485 if len(given_arguments) > len(pos_params): 

486 return invalid( 

487 f"Too many positional arguments for {test.__name__}() were passed to " 

488 f"@given - expected at most {len(pos_params)} " 

489 f"arguments, but got {len(given_arguments)} {given_arguments!r}" 

490 ) 

491 

492 if ... in given_arguments: 

493 return invalid( 

494 "... was passed as a positional argument to @given, but may only be " 

495 "passed as a keyword argument or as the sole argument of @given" 

496 ) 

497 

498 if given_arguments and given_kwargs: 

499 return invalid("cannot mix positional and keyword arguments to @given") 

500 extra_kwargs = [ 

501 k for k in given_kwargs if k not in {p.name for p in pos_params + kwonly_params} 

502 ] 

503 if extra_kwargs and (params == [] or params[-1].kind is not params[-1].VAR_KEYWORD): 

504 arg = extra_kwargs[0] 

505 extra = "" 

506 if arg in all_settings: 

507 extra = f". Did you mean @settings({arg}={given_kwargs[arg]!r})?" 

508 return invalid( 

509 f"{test.__name__}() got an unexpected keyword argument {arg!r}, " 

510 f"from `{arg}={given_kwargs[arg]!r}` in @given{extra}" 

511 ) 

512 if any(p.default is not p.empty for p in params): 

513 return invalid("Cannot apply @given to a function with defaults.") 

514 

515 # This case would raise Unsatisfiable *anyway*, but by detecting it here we can 

516 # provide a much more helpful error message for people e.g. using the Ghostwriter. 

517 empty = [ 

518 f"{s!r} (arg {idx})" for idx, s in enumerate(given_arguments) if s is NOTHING 

519 ] + [f"{name}={s!r}" for name, s in given_kwargs.items() if s is NOTHING] 

520 if empty: 

521 strats = "strategies" if len(empty) > 1 else "strategy" 

522 return invalid( 

523 f"Cannot generate examples from empty {strats}: " + ", ".join(empty), 

524 exc=Unsatisfiable, 

525 ) 

526 

527 

528def execute_explicit_examples(state, wrapped_test, arguments, kwargs, original_sig): 

529 assert isinstance(state, StateForActualGivenExecution) 

530 posargs = [ 

531 p.name 

532 for p in original_sig.parameters.values() 

533 if p.kind is p.POSITIONAL_OR_KEYWORD 

534 ] 

535 

536 for example in reversed(getattr(wrapped_test, "hypothesis_explicit_examples", ())): 

537 assert isinstance(example, Example) 

538 # All of this validation is to check that @example() got "the same" arguments 

539 # as @given, i.e. corresponding to the same parameters, even though they might 

540 # be any mixture of positional and keyword arguments. 

541 if example.args: 

542 assert not example.kwargs 

543 if any( 

544 p.kind is p.POSITIONAL_ONLY for p in original_sig.parameters.values() 

545 ): 

546 raise InvalidArgument( 

547 "Cannot pass positional arguments to @example() when decorating " 

548 "a test function which has positional-only parameters." 

549 ) 

550 if len(example.args) > len(posargs): 

551 raise InvalidArgument( 

552 "example has too many arguments for test. Expected at most " 

553 f"{len(posargs)} but got {len(example.args)}" 

554 ) 

555 example_kwargs = dict( 

556 zip(posargs[-len(example.args) :], example.args, strict=True) 

557 ) 

558 else: 

559 example_kwargs = dict(example.kwargs) 

560 given_kws = ", ".join( 

561 repr(k) for k in sorted(wrapped_test.hypothesis._given_kwargs) 

562 ) 

563 example_kws = ", ".join(repr(k) for k in sorted(example_kwargs)) 

564 if given_kws != example_kws: 

565 raise InvalidArgument( 

566 f"Inconsistent args: @given() got strategies for {given_kws}, " 

567 f"but @example() got arguments for {example_kws}" 

568 ) from None 

569 

570 # This is certainly true because the example_kwargs exactly match the params 

571 # reserved by @given(), which are then remove from the function signature. 

572 assert set(example_kwargs).isdisjoint(kwargs) 

573 example_kwargs.update(kwargs) 

574 

575 if Phase.explicit not in state.settings.phases: 

576 continue 

577 

578 with local_settings(state.settings): 

579 fragments_reported = [] 

580 empty_data = ConjectureData.for_choices([]) 

581 try: 

582 execute_example = partial( 

583 state.execute_once, 

584 empty_data, 

585 is_final=True, 

586 print_example=True, 

587 example_kwargs=example_kwargs, 

588 ) 

589 with with_reporter(fragments_reported.append): 

590 if example.raises is None: 

591 execute_example() 

592 else: 

593 # @example(...).xfail(...) 

594 bits = ", ".join(nicerepr(x) for x in arguments) + ", ".join( 

595 f"{k}={nicerepr(v)}" for k, v in example_kwargs.items() 

596 ) 

597 try: 

598 execute_example() 

599 except failure_exceptions_to_catch() as err: 

600 if not isinstance(err, example.raises): 

601 raise 

602 # Save a string form of this example; we'll warn if it's 

603 # ever generated by the strategy (which can't be xfailed) 

604 state.xfail_example_reprs.add( 

605 repr_call(state.test, arguments, example_kwargs) 

606 ) 

607 except example.raises as err: 

608 # We'd usually check this as early as possible, but it's 

609 # possible for failure_exceptions_to_catch() to grow when 

610 # e.g. pytest is imported between import- and test-time. 

611 raise InvalidArgument( 

612 f"@example({bits}) raised an expected {err!r}, " 

613 "but Hypothesis does not treat this as a test failure" 

614 ) from err 

615 else: 

616 # Unexpectedly passing; always raise an error in this case. 

617 reason = f" because {example.reason}" * bool(example.reason) 

618 if example.raises is BaseException: 

619 name = "exception" # special-case no raises= arg 

620 elif not isinstance(example.raises, tuple): 

621 name = example.raises.__name__ 

622 elif len(example.raises) == 1: 

623 name = example.raises[0].__name__ 

624 else: 

625 name = ( 

626 ", ".join(ex.__name__ for ex in example.raises[:-1]) 

627 + f", or {example.raises[-1].__name__}" 

628 ) 

629 vowel = name.upper()[0] in "AEIOU" 

630 raise AssertionError( 

631 f"Expected a{'n' * vowel} {name} from @example({bits})" 

632 f"{reason}, but no exception was raised." 

633 ) 

634 except UnsatisfiedAssumption: 

635 # Odd though it seems, we deliberately support explicit examples that 

636 # are then rejected by a call to `assume()`. As well as iterative 

637 # development, this is rather useful to replay Hypothesis' part of 

638 # a saved failure when other arguments are supplied by e.g. pytest. 

639 # See https://github.com/HypothesisWorks/hypothesis/issues/2125 

640 with contextlib.suppress(StopTest): 

641 empty_data.conclude_test(Status.INVALID) 

642 except BaseException as err: 

643 # In order to support reporting of multiple failing examples, we yield 

644 # each of the (report text, error) pairs we find back to the top-level 

645 # runner. This also ensures that user-facing stack traces have as few 

646 # frames of Hypothesis internals as possible. 

647 err = err.with_traceback(get_trimmed_traceback()) 

648 

649 # One user error - whether misunderstanding or typo - we've seen a few 

650 # times is to pass strategies to @example() where values are expected. 

651 # Checking is easy, and false-positives not much of a problem, so: 

652 if isinstance(err, failure_exceptions_to_catch()) and any( 

653 isinstance(arg, SearchStrategy) 

654 for arg in example.args + tuple(example.kwargs.values()) 

655 ): 

656 new = HypothesisWarning( 

657 "The @example() decorator expects to be passed values, but " 

658 "you passed strategies instead. See https://hypothesis." 

659 "readthedocs.io/en/latest/reference/api.html#hypothesis" 

660 ".example for details." 

661 ) 

662 new.__cause__ = err 

663 err = new 

664 

665 with contextlib.suppress(StopTest): 

666 empty_data.conclude_test(Status.INVALID) 

667 yield (fragments_reported, err) 

668 if ( 

669 state.settings.report_multiple_bugs 

670 and pytest_shows_exceptiongroups 

671 and isinstance(err, failure_exceptions_to_catch()) 

672 and not isinstance(err, skip_exceptions_to_reraise()) 

673 ): 

674 continue 

675 break 

676 finally: 

677 if fragments_reported: 

678 assert fragments_reported[0].startswith("Falsifying example") 

679 fragments_reported[0] = fragments_reported[0].replace( 

680 "Falsifying example", "Falsifying explicit example", 1 

681 ) 

682 

683 empty_data.freeze() 

684 if observability_enabled(): 

685 tc = make_testcase( 

686 run_start=state._start_timestamp, 

687 property=state.test_identifier, 

688 data=empty_data, 

689 how_generated="explicit example", 

690 representation=state._string_repr, 

691 timing=state._timing_features, 

692 ) 

693 deliver_observation(tc) 

694 

695 if fragments_reported: 

696 verbose_report(fragments_reported[0].replace("Falsifying", "Trying", 1)) 

697 for f in fragments_reported[1:]: 

698 verbose_report(f) 

699 

700 

701def get_random_for_wrapped_test(test, wrapped_test): 

702 settings = wrapped_test._hypothesis_internal_use_settings 

703 wrapped_test._hypothesis_internal_use_generated_seed = None 

704 

705 if wrapped_test._hypothesis_internal_use_seed is not None: 

706 return Random(wrapped_test._hypothesis_internal_use_seed) 

707 

708 if settings.derandomize: 

709 return Random(int_from_bytes(function_digest(test))) 

710 

711 if global_force_seed is not None: 

712 return Random(global_force_seed) 

713 

714 if threadlocal._hypothesis_global_random is None: # pragma: no cover 

715 threadlocal._hypothesis_global_random = Random() 

716 seed = threadlocal._hypothesis_global_random.getrandbits(128) 

717 wrapped_test._hypothesis_internal_use_generated_seed = seed 

718 return Random(seed) 

719 

720 

721@dataclass(slots=True, frozen=False) 

722class Stuff: 

723 selfy: Any 

724 args: tuple 

725 kwargs: dict 

726 given_kwargs: dict 

727 

728 

729def process_arguments_to_given( 

730 wrapped_test: Any, 

731 arguments: Sequence[object], 

732 kwargs: dict[str, object], 

733 given_kwargs: dict[str, SearchStrategy], 

734 params: dict[str, Parameter], 

735) -> tuple[Sequence[object], dict[str, object], Stuff]: 

736 selfy = None 

737 arguments, kwargs = convert_positional_arguments(wrapped_test, arguments, kwargs) 

738 

739 # If the test function is a method of some kind, the bound object 

740 # will be the first named argument if there are any, otherwise the 

741 # first vararg (if any). 

742 posargs = [p.name for p in params.values() if p.kind is p.POSITIONAL_OR_KEYWORD] 

743 if posargs: 

744 selfy = kwargs.get(posargs[0]) 

745 elif arguments: 

746 selfy = arguments[0] 

747 

748 # Ensure that we don't mistake mocks for self here. 

749 # This can cause the mock to be used as the test runner. 

750 if is_mock(selfy): 

751 selfy = None 

752 

753 arguments = tuple(arguments) 

754 

755 with ensure_free_stackframes(): 

756 for k, s in given_kwargs.items(): 

757 check_strategy(s, name=k) 

758 s.validate() 

759 

760 stuff = Stuff(selfy=selfy, args=arguments, kwargs=kwargs, given_kwargs=given_kwargs) 

761 

762 return arguments, kwargs, stuff 

763 

764 

765def skip_exceptions_to_reraise(): 

766 """Return a tuple of exceptions meaning 'skip this test', to re-raise. 

767 

768 This is intended to cover most common test runners; if you would 

769 like another to be added please open an issue or pull request adding 

770 it to this function and to tests/cover/test_lazy_import.py 

771 """ 

772 # This is a set in case any library simply re-exports another's Skip exception 

773 exceptions = set() 

774 # We use this sys.modules trick to avoid importing libraries - 

775 # you can't be an instance of a type from an unimported module! 

776 # This is fast enough that we don't need to cache the result, 

777 # and more importantly it avoids possible side-effects :-) 

778 if "unittest" in sys.modules: 

779 exceptions.add(sys.modules["unittest"].SkipTest) 

780 if "_pytest.outcomes" in sys.modules: 

781 exceptions.add(sys.modules["_pytest.outcomes"].Skipped) 

782 return tuple(sorted(exceptions, key=str)) 

783 

784 

785def failure_exceptions_to_catch() -> tuple[type[BaseException], ...]: 

786 """Return a tuple of exceptions meaning 'this test has failed', to catch. 

787 

788 This is intended to cover most common test runners; if you would 

789 like another to be added please open an issue or pull request. 

790 """ 

791 # While SystemExit and GeneratorExit are instances of BaseException, we also 

792 # expect them to be deterministic - unlike KeyboardInterrupt - and so we treat 

793 # them as standard exceptions, check for flakiness, etc. 

794 # See https://github.com/HypothesisWorks/hypothesis/issues/2223 for details. 

795 exceptions = [Exception, SystemExit, GeneratorExit] 

796 if "_pytest.outcomes" in sys.modules: 

797 exceptions.append(sys.modules["_pytest.outcomes"].Failed) 

798 return tuple(exceptions) 

799 

800 

801def new_given_signature(original_sig, given_kwargs): 

802 """Make an updated signature for the wrapped test.""" 

803 return original_sig.replace( 

804 parameters=[ 

805 p 

806 for p in original_sig.parameters.values() 

807 if not ( 

808 p.name in given_kwargs 

809 and p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY) 

810 ) 

811 ], 

812 return_annotation=None, 

813 ) 

814 

815 

816def default_executor(data, function): 

817 return function(data) 

818 

819 

820def get_executor(runner): 

821 try: 

822 execute_example = runner.execute_example 

823 except AttributeError: 

824 pass 

825 else: 

826 return lambda data, function: execute_example(partial(function, data)) 

827 

828 if hasattr(runner, "setup_example") or hasattr(runner, "teardown_example"): 

829 setup = getattr(runner, "setup_example", None) or (lambda: None) 

830 teardown = getattr(runner, "teardown_example", None) or (lambda ex: None) 

831 

832 def execute(data, function): 

833 token = None 

834 try: 

835 token = setup() 

836 return function(data) 

837 finally: 

838 teardown(token) 

839 

840 return execute 

841 

842 return default_executor 

843 

844 

845# This function is a crude solution, a better way of resolving it would probably 

846# be to rewrite a bunch of exception handlers to use except*. 

847T = TypeVar("T", bound=BaseException) 

848 

849 

850def _flatten_group(excgroup: BaseExceptionGroup[T]) -> list[T]: 

851 found_exceptions: list[T] = [] 

852 for exc in excgroup.exceptions: 

853 if isinstance(exc, BaseExceptionGroup): 

854 found_exceptions.extend(_flatten_group(exc)) 

855 else: 

856 found_exceptions.append(exc) 

857 return found_exceptions 

858 

859 

860@contextlib.contextmanager 

861def unwrap_markers_from_group() -> Generator[None, None, None]: 

862 try: 

863 yield 

864 except BaseExceptionGroup as excgroup: 

865 _frozen_exceptions, non_frozen_exceptions = excgroup.split(Frozen) 

866 

867 # group only contains Frozen, reraise the group 

868 # it doesn't matter what we raise, since any exceptions get disregarded 

869 # and reraised as StopTest if data got frozen. 

870 if non_frozen_exceptions is None: 

871 raise 

872 # in all other cases they are discarded 

873 

874 # Can RewindRecursive end up in this group? 

875 _, user_exceptions = non_frozen_exceptions.split( 

876 lambda e: isinstance(e, (StopTest, HypothesisException)) 

877 ) 

878 

879 # this might contain marker exceptions, or internal errors, but not frozen. 

880 if user_exceptions is not None: 

881 raise 

882 

883 # single marker exception - reraise it 

884 flattened_non_frozen_exceptions: list[BaseException] = _flatten_group( 

885 non_frozen_exceptions 

886 ) 

887 if len(flattened_non_frozen_exceptions) == 1: 

888 e = flattened_non_frozen_exceptions[0] 

889 # preserve the cause of the original exception to not hinder debugging 

890 # note that __context__ is still lost though 

891 raise e from e.__cause__ 

892 

893 # multiple marker exceptions. If we re-raise the whole group we break 

894 # a bunch of logic so ....? 

895 stoptests, non_stoptests = non_frozen_exceptions.split(StopTest) 

896 

897 # TODO: stoptest+hypothesisexception ...? Is it possible? If so, what do? 

898 

899 if non_stoptests: 

900 # TODO: multiple marker exceptions is easy to produce, but the logic in the 

901 # engine does not handle it... so we just reraise the first one for now. 

902 e = _flatten_group(non_stoptests)[0] 

903 raise e from e.__cause__ 

904 assert stoptests is not None 

905 

906 # multiple stoptests: raising the one with the lowest testcounter 

907 raise min(_flatten_group(stoptests), key=lambda s_e: s_e.testcounter) 

908 

909 

910class StateForActualGivenExecution: 

911 def __init__( 

912 self, stuff, test, settings, random, wrapped_test, *, thread_overlap=None 

913 ): 

914 self.stuff = stuff 

915 self.test = test 

916 self.settings = settings 

917 self.random = random 

918 self.wrapped_test = wrapped_test 

919 self.thread_overlap = {} if thread_overlap is None else thread_overlap 

920 

921 self.test_runner = get_executor(stuff.selfy) 

922 self.print_given_args = getattr( 

923 wrapped_test, "_hypothesis_internal_print_given_args", True 

924 ) 

925 

926 self.last_exception = None 

927 self.falsifying_examples = () 

928 self.ever_executed = False 

929 self.xfail_example_reprs = set() 

930 self.files_to_propagate = set() 

931 self.failed_normally = False 

932 self.failed_due_to_deadline = False 

933 

934 self.explain_traces = defaultdict(set) 

935 self._start_timestamp = time.time() 

936 self._string_repr = "" 

937 self._timing_features = {} 

938 

939 @property 

940 def test_identifier(self) -> str: 

941 return getattr( 

942 current_pytest_item.value, "nodeid", None 

943 ) or get_pretty_function_description(self.wrapped_test) 

944 

945 def _should_trace(self): 

946 # NOTE: we explicitly support monkeypatching this. Keep the namespace 

947 # access intact. 

948 _trace_obs = ( 

949 observability_enabled() and observability.OBSERVABILITY_COLLECT_COVERAGE 

950 ) 

951 _trace_failure = ( 

952 self.failed_normally 

953 and not self.failed_due_to_deadline 

954 and {Phase.shrink, Phase.explain}.issubset(self.settings.phases) 

955 ) 

956 return _trace_obs or _trace_failure 

957 

958 def execute_once( 

959 self, 

960 data, 

961 *, 

962 print_example=False, 

963 is_final=False, 

964 expected_failure=None, 

965 example_kwargs=None, 

966 ): 

967 """Run the test function once, using ``data`` as input. 

968 

969 If the test raises an exception, it will propagate through to the 

970 caller of this method. Depending on its type, this could represent 

971 an ordinary test failure, or a fatal error, or a control exception. 

972 

973 If this method returns normally, the test might have passed, or 

974 it might have placed ``data`` in an unsuccessful state and then 

975 swallowed the corresponding control exception. 

976 """ 

977 

978 self.ever_executed = True 

979 

980 self._string_repr = "" 

981 text_repr = None 

982 if self.settings.deadline is None and not observability_enabled(): 

983 

984 @proxies(self.test) 

985 def test(*args, **kwargs): 

986 with unwrap_markers_from_group(), ensure_free_stackframes(): 

987 return self.test(*args, **kwargs) 

988 

989 else: 

990 

991 @proxies(self.test) 

992 def test(*args, **kwargs): 

993 arg_drawtime = math.fsum(data.draw_times.values()) 

994 arg_stateful = math.fsum(data._stateful_run_times.values()) 

995 arg_gctime = gc_cumulative_time() 

996 with unwrap_markers_from_group(), ensure_free_stackframes(): 

997 start = time.perf_counter() 

998 try: 

999 result = self.test(*args, **kwargs) 

1000 finally: 

1001 finish = time.perf_counter() 

1002 in_drawtime = math.fsum(data.draw_times.values()) - arg_drawtime 

1003 in_stateful = ( 

1004 math.fsum(data._stateful_run_times.values()) - arg_stateful 

1005 ) 

1006 in_gctime = gc_cumulative_time() - arg_gctime 

1007 runtime = finish - start - in_drawtime - in_stateful - in_gctime 

1008 self._timing_features = { 

1009 "execute:test": runtime, 

1010 "overall:gc": in_gctime, 

1011 **data.draw_times, 

1012 **data._stateful_run_times, 

1013 } 

1014 

1015 if ( 

1016 (current_deadline := self.settings.deadline) is not None 

1017 # we disable the deadline check under concurrent threads, since 

1018 # cpython may switch away from a thread for arbitrarily long. 

1019 and not self.thread_overlap.get(threading.get_ident(), False) 

1020 ): 

1021 if not is_final: 

1022 current_deadline = (current_deadline // 4) * 5 

1023 if runtime >= current_deadline.total_seconds(): 

1024 raise DeadlineExceeded( 

1025 datetime.timedelta(seconds=runtime), self.settings.deadline 

1026 ) 

1027 return result 

1028 

1029 def run(data: ConjectureData) -> None: 

1030 # Set up dynamic context needed by a single test run. 

1031 if self.stuff.selfy is not None: 

1032 data.hypothesis_runner = self.stuff.selfy 

1033 # Generate all arguments to the test function. 

1034 args = self.stuff.args 

1035 kwargs = dict(self.stuff.kwargs) 

1036 if example_kwargs is None: 

1037 kw, argslices = context.prep_args_kwargs_from_strategies( 

1038 self.stuff.given_kwargs 

1039 ) 

1040 else: 

1041 kw = example_kwargs 

1042 argslices = {} 

1043 kwargs.update(kw) 

1044 if expected_failure is not None: 

1045 nonlocal text_repr 

1046 text_repr = repr_call(test, args, kwargs) 

1047 

1048 if print_example or current_verbosity() >= Verbosity.verbose: 

1049 printer = RepresentationPrinter(context=context) 

1050 if print_example: 

1051 printer.text("Falsifying example:") 

1052 else: 

1053 printer.text("Trying example:") 

1054 

1055 if self.print_given_args: 

1056 printer.text(" ") 

1057 printer.repr_call( 

1058 test.__name__, 

1059 args, 

1060 kwargs, 

1061 force_split=True, 

1062 arg_slices=argslices, 

1063 leading_comment=( 

1064 "# " + context.data.slice_comments[(0, 0)] 

1065 if (0, 0) in context.data.slice_comments 

1066 else None 

1067 ), 

1068 avoid_realization=data.provider.avoid_realization, 

1069 ) 

1070 report(printer.getvalue()) 

1071 

1072 if observability_enabled(): 

1073 printer = RepresentationPrinter(context=context) 

1074 printer.repr_call( 

1075 test.__name__, 

1076 args, 

1077 kwargs, 

1078 force_split=True, 

1079 arg_slices=argslices, 

1080 leading_comment=( 

1081 "# " + context.data.slice_comments[(0, 0)] 

1082 if (0, 0) in context.data.slice_comments 

1083 else None 

1084 ), 

1085 avoid_realization=data.provider.avoid_realization, 

1086 ) 

1087 self._string_repr = printer.getvalue() 

1088 

1089 try: 

1090 return test(*args, **kwargs) 

1091 except TypeError as e: 

1092 # If we sampled from a sequence of strategies, AND failed with a 

1093 # TypeError, *AND that exception mentions SearchStrategy*, add a note: 

1094 if ( 

1095 "SearchStrategy" in str(e) 

1096 and data._sampled_from_all_strategies_elements_message is not None 

1097 ): 

1098 msg, format_arg = data._sampled_from_all_strategies_elements_message 

1099 add_note(e, msg.format(format_arg)) 

1100 raise 

1101 finally: 

1102 if data._stateful_repr_parts is not None: 

1103 self._string_repr = "\n".join(data._stateful_repr_parts) 

1104 

1105 if observability_enabled(): 

1106 printer = RepresentationPrinter(context=context) 

1107 for name, value in data._observability_args.items(): 

1108 if name.startswith("generate:Draw "): 

1109 try: 

1110 value = data.provider.realize(value) 

1111 except BackendCannotProceed: # pragma: no cover 

1112 value = "<backend failed to realize symbolic>" 

1113 printer.text(f"\n{name.removeprefix('generate:')}: ") 

1114 printer.pretty(value) 

1115 

1116 self._string_repr += printer.getvalue() 

1117 

1118 # self.test_runner can include the execute_example method, or setup/teardown 

1119 # _example, so it's important to get the PRNG and build context in place first. 

1120 with ( 

1121 local_settings(self.settings), 

1122 deterministic_PRNG(), 

1123 BuildContext( 

1124 data, is_final=is_final, wrapped_test=self.wrapped_test 

1125 ) as context, 

1126 ): 

1127 # providers may throw in per_case_context_fn, and we'd like 

1128 # `result` to still be set in these cases. 

1129 result = None 

1130 with data.provider.per_test_case_context_manager(): 

1131 # Run the test function once, via the executor hook. 

1132 # In most cases this will delegate straight to `run(data)`. 

1133 result = self.test_runner(data, run) 

1134 

1135 # If a failure was expected, it should have been raised already, so 

1136 # instead raise an appropriate diagnostic error. 

1137 if expected_failure is not None: 

1138 exception, traceback = expected_failure 

1139 if isinstance(exception, DeadlineExceeded) and ( 

1140 runtime_secs := math.fsum( 

1141 v 

1142 for k, v in self._timing_features.items() 

1143 if k.startswith("execute:") 

1144 ) 

1145 ): 

1146 report( 

1147 "Unreliable test timings! On an initial run, this " 

1148 f"test took {exception.runtime.total_seconds() * 1000:.2f}ms, " 

1149 "which exceeded the deadline of " 

1150 f"{self.settings.deadline.total_seconds() * 1000:.2f}ms, but " 

1151 f"on a subsequent run it took {runtime_secs * 1000:.2f} ms, " 

1152 "which did not. If you expect this sort of " 

1153 "variability in your test timings, consider turning " 

1154 "deadlines off for this test by setting deadline=None." 

1155 ) 

1156 else: 

1157 report("Failed to reproduce exception. Expected: \n" + traceback) 

1158 raise FlakyFailure( 

1159 f"Hypothesis {text_repr} produces unreliable results: " 

1160 "Falsified on the first call but did not on a subsequent one", 

1161 [exception], 

1162 ) 

1163 return result 

1164 

1165 def _flaky_replay_to_failure( 

1166 self, err: FlakyReplay, context: BaseException 

1167 ) -> FlakyFailure: 

1168 # Note that in the mark_interesting case, _context_ itself 

1169 # is part of err._interesting_examples - but it's not in 

1170 # _runner.interesting_examples - this is fine, as the context 

1171 # (i.e., immediate exception) is appended. 

1172 interesting_examples = [ 

1173 self._runner.interesting_examples[origin] 

1174 for origin in err._interesting_origins 

1175 if origin in self._runner.interesting_examples 

1176 ] 

1177 exceptions = [result.expected_exception for result in interesting_examples] 

1178 exceptions.append(context) # the immediate exception 

1179 return FlakyFailure(err.reason, exceptions) 

1180 

1181 def _execute_once_for_engine(self, data: ConjectureData) -> None: 

1182 """Wrapper around ``execute_once`` that intercepts test failure 

1183 exceptions and single-test control exceptions, and turns them into 

1184 appropriate method calls to `data` instead. 

1185 

1186 This allows the engine to assume that any exception other than 

1187 ``StopTest`` must be a fatal error, and should stop the entire engine. 

1188 """ 

1189 trace: Trace = set() 

1190 try: 

1191 with Tracer(should_trace=self._should_trace()) as tracer: 

1192 try: 

1193 result = self.execute_once(data) 

1194 if ( 

1195 data.status == Status.VALID and tracer.branches 

1196 ): # pragma: no cover 

1197 # This is in fact covered by our *non-coverage* tests, but due 

1198 # to the settrace() contention *not* by our coverage tests. 

1199 self.explain_traces[None].add(frozenset(tracer.branches)) 

1200 finally: 

1201 trace = tracer.branches 

1202 if result is not None: 

1203 fail_health_check( 

1204 self.settings, 

1205 "Tests run under @given should return None, but " 

1206 f"{self.test.__name__} returned {result!r} instead.", 

1207 HealthCheck.return_value, 

1208 ) 

1209 except UnsatisfiedAssumption as e: 

1210 # An "assume" check failed, so instead we inform the engine that 

1211 # this test run was invalid. 

1212 try: 

1213 data.mark_invalid(e.reason) 

1214 except FlakyReplay as err: 

1215 # This was unexpected, meaning that the assume was flaky. 

1216 # Report it as such. 

1217 raise self._flaky_replay_to_failure(err, e) from None 

1218 except (StopTest, BackendCannotProceed): 

1219 # The engine knows how to handle this control exception, so it's 

1220 # OK to re-raise it. 

1221 raise 

1222 except ( 

1223 FailedHealthCheck, 

1224 *skip_exceptions_to_reraise(), 

1225 ): 

1226 # These are fatal errors or control exceptions that should stop the 

1227 # engine, so we re-raise them. 

1228 raise 

1229 except failure_exceptions_to_catch() as e: 

1230 # If an unhandled (i.e., non-Hypothesis) error was raised by 

1231 # Hypothesis-internal code, re-raise it as a fatal error instead 

1232 # of treating it as a test failure. 

1233 if isinstance(e, BaseExceptionGroup) and len(e.exceptions) == 1: 

1234 # When a naked exception is implicitly wrapped in an ExceptionGroup 

1235 # due to a re-raising "except*", the ExceptionGroup is constructed in 

1236 # the caller's stack frame (see #4183). This workaround is specifically 

1237 # for implicit wrapping of naked exceptions by "except*", since explicit 

1238 # raising of ExceptionGroup gets the proper traceback in the first place 

1239 # - there's no need to handle hierarchical groups here, at least if no 

1240 # such implicit wrapping happens inside hypothesis code (we only care 

1241 # about the hypothesis-or-not distinction). 

1242 # 

1243 # 01-25-2025: this was patched to give the correct 

1244 # stacktrace in cpython https://github.com/python/cpython/issues/128799. 

1245 # can remove once python3.11 is EOL. 

1246 tb = e.exceptions[0].__traceback__ or e.__traceback__ 

1247 else: 

1248 tb = e.__traceback__ 

1249 filepath = traceback.extract_tb(tb)[-1][0] 

1250 if ( 

1251 is_hypothesis_file(filepath) 

1252 and not isinstance(e, HypothesisException) 

1253 # We expect backend authors to use the provider_conformance test 

1254 # to test their backends. If an error occurs there, it is probably 

1255 # from their backend, and we would like to treat it as a standard 

1256 # error, not a hypothesis-internal error. 

1257 and not filepath.endswith( 

1258 f"internal{os.sep}conjecture{os.sep}provider_conformance.py" 

1259 ) 

1260 ): 

1261 raise 

1262 

1263 if data.frozen: 

1264 # This can happen if an error occurred in a finally 

1265 # block somewhere, suppressing our original StopTest. 

1266 # We raise a new one here to resume normal operation. 

1267 raise StopTest(data.testcounter) from e 

1268 else: 

1269 # The test failed by raising an exception, so we inform the 

1270 # engine that this test run was interesting. This is the normal 

1271 # path for test runs that fail. 

1272 tb = get_trimmed_traceback() 

1273 data.expected_traceback = format_exception(e, tb) 

1274 data.expected_exception = e 

1275 assert data.expected_traceback is not None # for mypy 

1276 verbose_report(data.expected_traceback) 

1277 

1278 self.failed_normally = True 

1279 

1280 interesting_origin = InterestingOrigin.from_exception(e) 

1281 if trace: # pragma: no cover 

1282 # Trace collection is explicitly disabled under coverage. 

1283 self.explain_traces[interesting_origin].add(frozenset(trace)) 

1284 if interesting_origin.exc_type == DeadlineExceeded: 

1285 self.failed_due_to_deadline = True 

1286 self.explain_traces.clear() 

1287 try: 

1288 data.mark_interesting(interesting_origin) 

1289 except FlakyReplay as err: 

1290 raise self._flaky_replay_to_failure(err, e) from None 

1291 

1292 finally: 

1293 # Conditional here so we can save some time constructing the payload; in 

1294 # other cases (without coverage) it's cheap enough to do that regardless. 

1295 if observability_enabled(): 

1296 if runner := getattr(self, "_runner", None): 

1297 phase = runner._current_phase 

1298 else: # pragma: no cover # in case of messing with internals 

1299 if self.failed_normally or self.failed_due_to_deadline: 

1300 phase = "shrink" 

1301 else: 

1302 phase = "unknown" 

1303 backend_desc = f", using backend={self.settings.backend!r}" * ( 

1304 self.settings.backend != "hypothesis" 

1305 and not getattr(runner, "_switch_to_hypothesis_provider", False) 

1306 ) 

1307 try: 

1308 data._observability_args = data.provider.realize( 

1309 data._observability_args 

1310 ) 

1311 except BackendCannotProceed: 

1312 data._observability_args = {} 

1313 

1314 try: 

1315 self._string_repr = data.provider.realize(self._string_repr) 

1316 except BackendCannotProceed: 

1317 self._string_repr = "<backend failed to realize symbolic arguments>" 

1318 

1319 try: 

1320 data.events = data.provider.realize(data.events) 

1321 except BackendCannotProceed: 

1322 data.events = {} 

1323 

1324 data.freeze() 

1325 tc = make_testcase( 

1326 run_start=self._start_timestamp, 

1327 property=self.test_identifier, 

1328 data=data, 

1329 how_generated=f"during {phase} phase{backend_desc}", 

1330 representation=self._string_repr, 

1331 arguments=data._observability_args, 

1332 timing=self._timing_features, 

1333 coverage=tractable_coverage_report(trace) or None, 

1334 phase=phase, 

1335 backend_metadata=data.provider.observe_test_case(), 

1336 ) 

1337 deliver_observation(tc) 

1338 

1339 for msg in data.provider.observe_information_messages( 

1340 lifetime="test_case" 

1341 ): 

1342 self._deliver_information_message(**msg) 

1343 self._timing_features = {} 

1344 

1345 def _deliver_information_message( 

1346 self, *, type: InfoObservationType, title: str, content: str | dict 

1347 ) -> None: 

1348 deliver_observation( 

1349 InfoObservation( 

1350 type=type, 

1351 run_start=self._start_timestamp, 

1352 property=self.test_identifier, 

1353 title=title, 

1354 content=content, 

1355 ) 

1356 ) 

1357 

1358 def run_engine(self): 

1359 """Run the test function many times, on database input and generated 

1360 input, using the Conjecture engine. 

1361 """ 

1362 # Tell pytest to omit the body of this function from tracebacks 

1363 __tracebackhide__ = True 

1364 try: 

1365 database_key = self.wrapped_test._hypothesis_internal_database_key 

1366 except AttributeError: 

1367 if global_force_seed is None: 

1368 database_key = function_digest(self.test) 

1369 else: 

1370 database_key = None 

1371 

1372 runner = self._runner = ConjectureRunner( 

1373 self._execute_once_for_engine, 

1374 settings=self.settings, 

1375 random=self.random, 

1376 database_key=database_key, 

1377 thread_overlap=self.thread_overlap, 

1378 ) 

1379 # Use the Conjecture engine to run the test function many times 

1380 # on different inputs. 

1381 runner.run() 

1382 note_statistics(runner.statistics) 

1383 if observability_enabled(): 

1384 self._deliver_information_message( 

1385 type="info", 

1386 title="Hypothesis Statistics", 

1387 content=describe_statistics(runner.statistics), 

1388 ) 

1389 for msg in ( 

1390 p if isinstance(p := runner.provider, PrimitiveProvider) else p(None) 

1391 ).observe_information_messages(lifetime="test_function"): 

1392 self._deliver_information_message(**msg) 

1393 

1394 if runner.call_count == 0: 

1395 return 

1396 if runner.interesting_examples: 

1397 self.falsifying_examples = sorted( 

1398 runner.interesting_examples.values(), 

1399 key=lambda d: sort_key(d.nodes), 

1400 reverse=True, 

1401 ) 

1402 else: 

1403 if runner.valid_examples == 0: 

1404 explanations = [] 

1405 # use a somewhat arbitrary cutoff to avoid recommending spurious 

1406 # fixes. 

1407 # eg, a few invalid examples from internal filters when the 

1408 # problem is the user generating large inputs, or a 

1409 # few overruns during internal mutation when the problem is 

1410 # impossible user filters/assumes. 

1411 if runner.invalid_examples > min(20, runner.call_count // 5): 

1412 explanations.append( 

1413 f"{runner.invalid_examples} of {runner.call_count} " 

1414 "examples failed a .filter() or assume() condition. Try " 

1415 "making your filters or assumes less strict, or rewrite " 

1416 "using strategy parameters: " 

1417 "st.integers().filter(lambda x: x > 0) fails less often " 

1418 "(that is, never) when rewritten as st.integers(min_value=1)." 

1419 ) 

1420 if runner.overrun_examples > min(20, runner.call_count // 5): 

1421 explanations.append( 

1422 f"{runner.overrun_examples} of {runner.call_count} " 

1423 "examples were too large to finish generating; try " 

1424 "reducing the typical size of your inputs?" 

1425 ) 

1426 rep = get_pretty_function_description(self.test) 

1427 raise Unsatisfiable( 

1428 f"Unable to satisfy assumptions of {rep}. " 

1429 f"{' Also, '.join(explanations)}" 

1430 ) 

1431 

1432 # If we have not traced executions, warn about that now (but only when 

1433 # we'd expect to do so reliably, i.e. on CPython>=3.12) 

1434 if ( 

1435 hasattr(sys, "monitoring") 

1436 and not PYPY 

1437 and self._should_trace() 

1438 and not Tracer.can_trace() 

1439 ): # pragma: no cover 

1440 # actually covered by our tests, but only on >= 3.12 

1441 warnings.warn( 

1442 "avoiding tracing test function because tool id " 

1443 f"{MONITORING_TOOL_ID} is already taken by tool " 

1444 f"{sys.monitoring.get_tool(MONITORING_TOOL_ID)}.", 

1445 HypothesisWarning, 

1446 stacklevel=3, 

1447 ) 

1448 

1449 if not self.falsifying_examples: 

1450 return 

1451 elif not (self.settings.report_multiple_bugs and pytest_shows_exceptiongroups): 

1452 # Pretend that we only found one failure, by discarding the others. 

1453 del self.falsifying_examples[:-1] 

1454 

1455 # The engine found one or more failures, so we need to reproduce and 

1456 # report them. 

1457 

1458 errors_to_report = [] 

1459 

1460 report_lines = describe_targets(runner.best_observed_targets) 

1461 if report_lines: 

1462 report_lines.append("") 

1463 

1464 explanations = explanatory_lines(self.explain_traces, self.settings) 

1465 for falsifying_example in self.falsifying_examples: 

1466 fragments = [] 

1467 

1468 ran_example = runner.new_conjecture_data( 

1469 falsifying_example.choices, max_choices=len(falsifying_example.choices) 

1470 ) 

1471 ran_example.slice_comments = falsifying_example.slice_comments 

1472 tb = None 

1473 origin = None 

1474 assert falsifying_example.expected_exception is not None 

1475 assert falsifying_example.expected_traceback is not None 

1476 try: 

1477 with with_reporter(fragments.append): 

1478 self.execute_once( 

1479 ran_example, 

1480 print_example=True, 

1481 is_final=True, 

1482 expected_failure=( 

1483 falsifying_example.expected_exception, 

1484 falsifying_example.expected_traceback, 

1485 ), 

1486 ) 

1487 except StopTest as e: 

1488 # Link the expected exception from the first run. Not sure 

1489 # how to access the current exception, if it failed 

1490 # differently on this run. In fact, in the only known 

1491 # reproducer, the StopTest is caused by OVERRUN before the 

1492 # test is even executed. Possibly because all initial examples 

1493 # failed until the final non-traced replay, and something was 

1494 # exhausted? Possibly a FIXME, but sufficiently weird to 

1495 # ignore for now. 

1496 err = FlakyFailure( 

1497 "Inconsistent results: An example failed on the " 

1498 "first run but now succeeds (or fails with another " 

1499 "error, or is for some reason not runnable).", 

1500 # (note: e is a BaseException) 

1501 [falsifying_example.expected_exception or e], 

1502 ) 

1503 errors_to_report.append((fragments, err)) 

1504 except UnsatisfiedAssumption as e: # pragma: no cover # ironically flaky 

1505 err = FlakyFailure( 

1506 "Unreliable assumption: An example which satisfied " 

1507 "assumptions on the first run now fails it.", 

1508 [e], 

1509 ) 

1510 errors_to_report.append((fragments, err)) 

1511 except BaseException as e: 

1512 # If we have anything for explain-mode, this is the time to report. 

1513 fragments.extend(explanations[falsifying_example.interesting_origin]) 

1514 errors_to_report.append( 

1515 (fragments, e.with_traceback(get_trimmed_traceback())) 

1516 ) 

1517 tb = format_exception(e, get_trimmed_traceback(e)) 

1518 origin = InterestingOrigin.from_exception(e) 

1519 else: 

1520 # execute_once() will always raise either the expected error, or Flaky. 

1521 raise NotImplementedError("This should be unreachable") 

1522 finally: 

1523 ran_example.freeze() 

1524 if observability_enabled(): 

1525 # log our observability line for the final failing example 

1526 tc = make_testcase( 

1527 run_start=self._start_timestamp, 

1528 property=self.test_identifier, 

1529 data=ran_example, 

1530 how_generated="minimal failing example", 

1531 representation=self._string_repr, 

1532 arguments=ran_example._observability_args, 

1533 timing=self._timing_features, 

1534 coverage=None, # Not recorded when we're replaying the MFE 

1535 status="passed" if sys.exc_info()[0] else "failed", 

1536 status_reason=str(origin or "unexpected/flaky pass"), 

1537 metadata={"traceback": tb}, 

1538 ) 

1539 deliver_observation(tc) 

1540 

1541 # Whether or not replay actually raised the exception again, we want 

1542 # to print the reproduce_failure decorator for the failing example. 

1543 if self.settings.print_blob: 

1544 fragments.append( 

1545 "\nYou can reproduce this example by temporarily adding " 

1546 f"{reproduction_decorator(falsifying_example.choices)} " 

1547 "as a decorator on your test case" 

1548 ) 

1549 

1550 _raise_to_user( 

1551 errors_to_report, 

1552 self.settings, 

1553 report_lines, 

1554 # A backend might report a failure and then report verified afterwards, 

1555 # which is to be interpreted as "there are no more failures *other 

1556 # than what we already reported*". Do not report this as unsound. 

1557 unsound_backend=( 

1558 runner._verified_by 

1559 if runner._verified_by and not runner._backend_found_failure 

1560 else None 

1561 ), 

1562 ) 

1563 

1564 

1565def _raise_to_user( 

1566 errors_to_report, settings, target_lines, trailer="", *, unsound_backend=None 

1567): 

1568 """Helper function for attaching notes and grouping multiple errors.""" 

1569 failing_prefix = "Falsifying example: " 

1570 ls = [] 

1571 for fragments, err in errors_to_report: 

1572 for note in fragments: 

1573 add_note(err, note) 

1574 if note.startswith(failing_prefix): 

1575 ls.append(note.removeprefix(failing_prefix)) 

1576 if current_pytest_item.value: 

1577 current_pytest_item.value._hypothesis_failing_examples = ls 

1578 

1579 if len(errors_to_report) == 1: 

1580 _, the_error_hypothesis_found = errors_to_report[0] 

1581 else: 

1582 assert errors_to_report 

1583 the_error_hypothesis_found = BaseExceptionGroup( 

1584 f"Hypothesis found {len(errors_to_report)} distinct failures{trailer}.", 

1585 [e for _, e in errors_to_report], 

1586 ) 

1587 

1588 if settings.verbosity >= Verbosity.normal: 

1589 for line in target_lines: 

1590 add_note(the_error_hypothesis_found, line) 

1591 

1592 if unsound_backend: 

1593 add_note( 

1594 err, 

1595 f"backend={unsound_backend!r} claimed to verify this test passes - " 

1596 "please send them a bug report!", 

1597 ) 

1598 

1599 raise the_error_hypothesis_found 

1600 

1601 

1602@contextlib.contextmanager 

1603def fake_subTest(self, msg=None, **__): 

1604 """Monkeypatch for `unittest.TestCase.subTest` during `@given`. 

1605 

1606 If we don't patch this out, each failing example is reported as a 

1607 separate failing test by the unittest test runner, which is 

1608 obviously incorrect. We therefore replace it for the duration with 

1609 this version. 

1610 """ 

1611 warnings.warn( 

1612 "subTest per-example reporting interacts badly with Hypothesis " 

1613 "trying hundreds of examples, so we disable it for the duration of " 

1614 "any test that uses `@given`.", 

1615 HypothesisWarning, 

1616 stacklevel=2, 

1617 ) 

1618 yield 

1619 

1620 

1621@dataclass(slots=False, frozen=False) 

1622class HypothesisHandle: 

1623 """This object is provided as the .hypothesis attribute on @given tests. 

1624 

1625 Downstream users can reassign its attributes to insert custom logic into 

1626 the execution of each case, for example by converting an async into a 

1627 sync function. 

1628 

1629 This must be an attribute of an attribute, because reassignment of a 

1630 first-level attribute would not be visible to Hypothesis if the function 

1631 had been decorated before the assignment. 

1632 

1633 See https://github.com/HypothesisWorks/hypothesis/issues/1257 for more 

1634 information. 

1635 """ 

1636 

1637 inner_test: Any 

1638 _get_fuzz_target: Any 

1639 _given_kwargs: Any 

1640 

1641 @property 

1642 def fuzz_one_input( 

1643 self, 

1644 ) -> Callable[[bytes | bytearray | memoryview | BinaryIO], bytes | None]: 

1645 """ 

1646 Run the test as a fuzz target, driven with the ``buffer`` of bytes. 

1647 

1648 Depending on the passed ``buffer`` one of three things will happen: 

1649 

1650 * If the bytestring was invalid, for example because it was too short or was 

1651 filtered out by |assume| or |.filter|, |fuzz_one_input| returns ``None``. 

1652 * If the bytestring was valid and the test passed, |fuzz_one_input| returns 

1653 a canonicalised and pruned bytestring which will replay that test case. 

1654 This is provided as an option to improve the performance of mutating 

1655 fuzzers, but can safely be ignored. 

1656 * If the test *failed*, i.e. raised an exception, |fuzz_one_input| will 

1657 add the pruned buffer to :ref:`the Hypothesis example database <database>` 

1658 and then re-raise that exception. All you need to do to reproduce, 

1659 minimize, and de-duplicate all the failures found via fuzzing is run 

1660 your test suite! 

1661 

1662 To reduce the performance impact of database writes, |fuzz_one_input| only 

1663 records failing inputs which would be valid shrinks for a known failure - 

1664 meaning writes are somewhere between constant and log(N) rather than linear 

1665 in runtime. However, this tracking only works within a persistent fuzzing 

1666 process; for forkserver fuzzers we recommend ``database=None`` for the main 

1667 run, and then replaying with a database enabled if you need to analyse 

1668 failures. 

1669 

1670 Note that the interpretation of both input and output bytestrings is 

1671 specific to the exact version of Hypothesis you are using and the strategies 

1672 given to the test, just like the :ref:`database <database>` and 

1673 |@reproduce_failure|. 

1674 

1675 Interaction with |@settings| 

1676 ---------------------------- 

1677 

1678 |fuzz_one_input| uses just enough of Hypothesis' internals to drive your 

1679 test function with a bytestring, and most settings therefore have no effect 

1680 in this mode. We recommend running your tests the usual way before fuzzing 

1681 to get the benefits of health checks, as well as afterwards to replay, 

1682 shrink, deduplicate, and report whatever errors were discovered. 

1683 

1684 * |settings.database| *is* used by |fuzz_one_input| - adding failures to 

1685 the database to be replayed when 

1686 you next run your tests is our preferred reporting mechanism and response 

1687 to `the 'fuzzer taming' problem <https://blog.regehr.org/archives/925>`__. 

1688 * |settings.verbosity| and |settings.stateful_step_count| work as usual. 

1689 * The |~settings.deadline|, |~settings.derandomize|, |~settings.max_examples|, 

1690 |~settings.phases|, |~settings.print_blob|, |~settings.report_multiple_bugs|, 

1691 and |~settings.suppress_health_check| settings do not affect |fuzz_one_input|. 

1692 

1693 Example Usage 

1694 ------------- 

1695 

1696 .. code-block:: python 

1697 

1698 @given(st.text()) 

1699 def test_foo(s): ... 

1700 

1701 # This is a traditional fuzz target - call it with a bytestring, 

1702 # or a binary IO object, and it runs the test once. 

1703 fuzz_target = test_foo.hypothesis.fuzz_one_input 

1704 

1705 # For example: 

1706 fuzz_target(b"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00") 

1707 fuzz_target(io.BytesIO(b"\\x01")) 

1708 

1709 .. tip:: 

1710 

1711 If you expect to discover many failures while using |fuzz_one_input|, 

1712 consider wrapping your database with |BackgroundWriteDatabase|, for 

1713 low-overhead writes of failures. 

1714 

1715 .. tip:: 

1716 

1717 | Want an integrated workflow for your team's local tests, CI, and continuous fuzzing? 

1718 | Use `HypoFuzz <https://hypofuzz.com/>`__ to fuzz your whole test suite, and find more bugs with the same tests! 

1719 

1720 .. seealso:: 

1721 

1722 See also the :doc:`/how-to/external-fuzzers` how-to. 

1723 """ 

1724 # Note: most users, if they care about fuzzer performance, will access the 

1725 # property and assign it to a local variable to move the attribute lookup 

1726 # outside their fuzzing loop / before the fork point. We cache it anyway, 

1727 # so that naive or unusual use-cases get the best possible performance too. 

1728 try: 

1729 return self.__cached_target # type: ignore 

1730 except AttributeError: 

1731 self.__cached_target = self._get_fuzz_target() 

1732 return self.__cached_target 

1733 

1734 

1735@overload 

1736def given( 

1737 _: EllipsisType, / 

1738) -> Callable[ 

1739 [Callable[..., Coroutine[Any, Any, None] | None]], Callable[[], None] 

1740]: # pragma: no cover 

1741 ... 

1742 

1743 

1744@overload 

1745def given( 

1746 *_given_arguments: SearchStrategy[Any], 

1747) -> Callable[ 

1748 [Callable[..., Coroutine[Any, Any, None] | None]], Callable[..., None] 

1749]: # pragma: no cover 

1750 ... 

1751 

1752 

1753@overload 

1754def given( 

1755 **_given_kwargs: SearchStrategy[Any] | EllipsisType, 

1756) -> Callable[ 

1757 [Callable[..., Coroutine[Any, Any, None] | None]], Callable[..., None] 

1758]: # pragma: no cover 

1759 ... 

1760 

1761 

1762def given( 

1763 *_given_arguments: SearchStrategy[Any] | EllipsisType, 

1764 **_given_kwargs: SearchStrategy[Any] | EllipsisType, 

1765) -> Callable[[Callable[..., Coroutine[Any, Any, None] | None]], Callable[..., None]]: 

1766 """ 

1767 The |@given| decorator turns a function into a Hypothesis test. This is the 

1768 main entry point to Hypothesis. 

1769 

1770 .. seealso:: 

1771 

1772 See also the :doc:`/tutorial/introduction` tutorial, which introduces 

1773 defining Hypothesis tests with |@given|. 

1774 

1775 .. _given-arguments: 

1776 

1777 Arguments to ``@given`` 

1778 ----------------------- 

1779 

1780 Arguments to |@given| may be either positional or keyword arguments: 

1781 

1782 .. code-block:: python 

1783 

1784 @given(st.integers(), st.floats()) 

1785 def test_one(x, y): 

1786 pass 

1787 

1788 @given(x=st.integers(), y=st.floats()) 

1789 def test_two(x, y): 

1790 pass 

1791 

1792 If using keyword arguments, the arguments may appear in any order, as with 

1793 standard Python functions: 

1794 

1795 .. code-block:: python 

1796 

1797 # different order, but still equivalent to before 

1798 @given(y=st.floats(), x=st.integers()) 

1799 def test(x, y): 

1800 assert isinstance(x, int) 

1801 assert isinstance(y, float) 

1802 

1803 If |@given| is provided fewer positional arguments than the decorated test, 

1804 the test arguments are filled in on the right side, leaving the leftmost 

1805 positional arguments unfilled: 

1806 

1807 .. code-block:: python 

1808 

1809 @given(st.integers(), st.floats()) 

1810 def test(manual_string, y, z): 

1811 assert manual_string == "x" 

1812 assert isinstance(y, int) 

1813 assert isinstance(z, float) 

1814 

1815 # `test` is now a callable which takes one argument `manual_string` 

1816 

1817 test("x") 

1818 # or equivalently: 

1819 test(manual_string="x") 

1820 

1821 The reason for this "from the right" behavior is to support using |@given| 

1822 with instance methods, by automatically passing through ``self``: 

1823 

1824 .. code-block:: python 

1825 

1826 class MyTest(TestCase): 

1827 @given(st.integers()) 

1828 def test(self, x): 

1829 assert isinstance(self, MyTest) 

1830 assert isinstance(x, int) 

1831 

1832 If (and only if) using keyword arguments, |@given| may be combined with 

1833 ``**kwargs`` or ``*args``: 

1834 

1835 .. code-block:: python 

1836 

1837 @given(x=integers(), y=integers()) 

1838 def test(x, **kwargs): 

1839 assert "y" in kwargs 

1840 

1841 @given(x=integers(), y=integers()) 

1842 def test(x, *args, **kwargs): 

1843 assert args == () 

1844 assert "x" not in kwargs 

1845 assert "y" in kwargs 

1846 

1847 It is an error to: 

1848 

1849 * Mix positional and keyword arguments to |@given|. 

1850 * Use |@given| with a function that has a default value for an argument. 

1851 * Use |@given| with positional arguments with a function that uses ``*args``, 

1852 ``**kwargs``, or keyword-only arguments. 

1853 

1854 The function returned by given has all the same arguments as the original 

1855 test, minus those that are filled in by |@given|. See the :ref:`notes on 

1856 framework compatibility <framework-compatibility>` for how this interacts 

1857 with features of other testing libraries, such as :pypi:`pytest` fixtures. 

1858 """ 

1859 

1860 if currently_in_test_context(): 

1861 fail_health_check( 

1862 Settings(), 

1863 "Nesting @given tests results in quadratic generation and shrinking " 

1864 "behavior, and can usually be more cleanly expressed by replacing the " 

1865 "inner function with an st.data() parameter on the outer @given." 

1866 "\n\n" 

1867 "If it is difficult or impossible to refactor this test to remove the " 

1868 "nested @given, you can disable this health check with " 

1869 "@settings(suppress_health_check=[HealthCheck.nested_given]) on the " 

1870 "outer @given. See " 

1871 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

1872 "for details.", 

1873 HealthCheck.nested_given, 

1874 ) 

1875 

1876 def run_test_as_given(test): 

1877 if inspect.isclass(test): 

1878 # Provide a meaningful error to users, instead of exceptions from 

1879 # internals that assume we're dealing with a function. 

1880 raise InvalidArgument("@given cannot be applied to a class") 

1881 

1882 if ( 

1883 "_pytest" in sys.modules 

1884 and "_pytest.fixtures" in sys.modules 

1885 and ( 

1886 tuple(map(int, sys.modules["_pytest"].__version__.split(".")[:2])) 

1887 >= (8, 4) 

1888 ) 

1889 and isinstance( 

1890 test, sys.modules["_pytest.fixtures"].FixtureFunctionDefinition 

1891 ) 

1892 ): # pragma: no cover # covered by pytest/test_fixtures, but not by cover/ 

1893 raise InvalidArgument("@given cannot be applied to a pytest fixture") 

1894 

1895 given_arguments = tuple(_given_arguments) 

1896 given_kwargs = dict(_given_kwargs) 

1897 

1898 original_sig = get_signature(test) 

1899 if given_arguments == (Ellipsis,) and not given_kwargs: 

1900 # user indicated that they want to infer all arguments 

1901 given_kwargs = { 

1902 p.name: Ellipsis 

1903 for p in original_sig.parameters.values() 

1904 if p.kind in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY) 

1905 } 

1906 given_arguments = () 

1907 

1908 check_invalid = is_invalid_test( 

1909 test, original_sig, given_arguments, given_kwargs 

1910 ) 

1911 

1912 # If the argument check found problems, return a dummy test function 

1913 # that will raise an error if it is actually called. 

1914 if check_invalid is not None: 

1915 return check_invalid 

1916 

1917 # Because the argument check succeeded, we can convert @given's 

1918 # positional arguments into keyword arguments for simplicity. 

1919 if given_arguments: 

1920 assert not given_kwargs 

1921 posargs = [ 

1922 p.name 

1923 for p in original_sig.parameters.values() 

1924 if p.kind is p.POSITIONAL_OR_KEYWORD 

1925 ] 

1926 given_kwargs = dict( 

1927 list(zip(posargs[::-1], given_arguments[::-1], strict=False))[::-1] 

1928 ) 

1929 # These have been converted, so delete them to prevent accidental use. 

1930 del given_arguments 

1931 

1932 new_signature = new_given_signature(original_sig, given_kwargs) 

1933 

1934 # Use type information to convert "infer" arguments into appropriate strategies. 

1935 if ... in given_kwargs.values(): 

1936 hints = get_type_hints(test) 

1937 for name in [name for name, value in given_kwargs.items() if value is ...]: 

1938 if name not in hints: 

1939 return _invalid( 

1940 f"passed {name}=... for {test.__name__}, but {name} has " 

1941 "no type annotation", 

1942 test=test, 

1943 given_kwargs=given_kwargs, 

1944 ) 

1945 given_kwargs[name] = st.from_type(hints[name]) 

1946 

1947 # only raise if the same thread uses two different executors, not if two 

1948 # different threads use different executors. 

1949 thread_local = ThreadLocal(prev_self=lambda: not_set) 

1950 # maps thread_id to whether that thread overlaps in execution with any 

1951 # other thread in this @given. We use this to detect whether an @given is 

1952 # being run from multiple different threads at once, which informs 

1953 # decisions like whether to raise DeadlineExceeded or HealthCheck.too_slow. 

1954 thread_overlap: dict[int, bool] = {} 

1955 thread_overlap_lock = Lock() 

1956 

1957 @impersonate(test) 

1958 @define_function_signature(test.__name__, test.__doc__, new_signature) 

1959 def wrapped_test(*arguments, **kwargs): 

1960 # Tell pytest to omit the body of this function from tracebacks 

1961 __tracebackhide__ = True 

1962 with thread_overlap_lock: 

1963 for overlap_thread_id in thread_overlap: 

1964 thread_overlap[overlap_thread_id] = True 

1965 

1966 threadid = threading.get_ident() 

1967 # if there are existing threads when this thread starts, then 

1968 # this thread starts at an overlapped state. 

1969 has_existing_threads = len(thread_overlap) > 0 

1970 thread_overlap[threadid] = has_existing_threads 

1971 

1972 try: 

1973 test = wrapped_test.hypothesis.inner_test 

1974 if getattr(test, "is_hypothesis_test", False): 

1975 raise InvalidArgument( 

1976 f"You have applied @given to the test {test.__name__} more than " 

1977 "once, which wraps the test several times and is extremely slow. " 

1978 "A similar effect can be gained by combining the arguments " 

1979 "of the two calls to given. For example, instead of " 

1980 "@given(booleans()) @given(integers()), you could write " 

1981 "@given(booleans(), integers())" 

1982 ) 

1983 

1984 settings = wrapped_test._hypothesis_internal_use_settings 

1985 random = get_random_for_wrapped_test(test, wrapped_test) 

1986 arguments, kwargs, stuff = process_arguments_to_given( 

1987 wrapped_test, 

1988 arguments, 

1989 kwargs, 

1990 given_kwargs, 

1991 new_signature.parameters, 

1992 ) 

1993 

1994 if ( 

1995 inspect.iscoroutinefunction(test) 

1996 and get_executor(stuff.selfy) is default_executor 

1997 ): 

1998 # See https://github.com/HypothesisWorks/hypothesis/issues/3054 

1999 # If our custom executor doesn't handle coroutines, or we return an 

2000 # awaitable from a non-async-def function, we just rely on the 

2001 # return_value health check. This catches most user errors though. 

2002 raise InvalidArgument( 

2003 "Hypothesis doesn't know how to run async test functions like " 

2004 f"{test.__name__}. You'll need to write a custom executor, " 

2005 "or use a library like pytest-asyncio or pytest-trio which can " 

2006 "handle the translation for you.\n See https://hypothesis." 

2007 "readthedocs.io/en/latest/details.html#custom-function-execution" 

2008 ) 

2009 

2010 runner = stuff.selfy 

2011 if isinstance(stuff.selfy, TestCase) and test.__name__ in dir(TestCase): 

2012 fail_health_check( 

2013 settings, 

2014 f"You have applied @given to the method {test.__name__}, which is " 

2015 "used by the unittest runner but is not itself a test. " 

2016 "This is not useful in any way.", 

2017 HealthCheck.not_a_test_method, 

2018 ) 

2019 if bad_django_TestCase(runner): # pragma: no cover 

2020 # Covered by the Django tests, but not the pytest coverage task 

2021 raise InvalidArgument( 

2022 "You have applied @given to a method on " 

2023 f"{type(runner).__qualname__}, but this " 

2024 "class does not inherit from the supported versions in " 

2025 "`hypothesis.extra.django`. Use the Hypothesis variants " 

2026 "to ensure that each example is run in a separate " 

2027 "database transaction." 

2028 ) 

2029 

2030 nonlocal thread_local 

2031 # Check selfy really is self (not e.g. a mock) before we health-check 

2032 cur_self = ( 

2033 stuff.selfy 

2034 if getattr(type(stuff.selfy), test.__name__, None) is wrapped_test 

2035 else None 

2036 ) 

2037 if thread_local.prev_self is not_set: 

2038 thread_local.prev_self = cur_self 

2039 elif cur_self is not thread_local.prev_self: 

2040 fail_health_check( 

2041 settings, 

2042 f"The method {test.__qualname__} was called from multiple " 

2043 "different executors. This may lead to flaky tests and " 

2044 "nonreproducible errors when replaying from database." 

2045 "\n\n" 

2046 "Unlike most health checks, HealthCheck.differing_executors " 

2047 "warns about a correctness issue with your test. We " 

2048 "therefore recommend fixing the underlying issue, rather " 

2049 "than suppressing this health check. However, if you are " 

2050 "confident this health check can be safely disabled, you can " 

2051 "do so with " 

2052 "@settings(suppress_health_check=[HealthCheck.differing_executors]). " 

2053 "See " 

2054 "https://hypothesis.readthedocs.io/en/latest/reference/api.html#hypothesis.HealthCheck " 

2055 "for details.", 

2056 HealthCheck.differing_executors, 

2057 ) 

2058 

2059 state = StateForActualGivenExecution( 

2060 stuff, 

2061 test, 

2062 settings, 

2063 random, 

2064 wrapped_test, 

2065 thread_overlap=thread_overlap, 

2066 ) 

2067 

2068 # If there was a @reproduce_failure decorator, use it to reproduce 

2069 # the error (or complain that we couldn't). Either way, this will 

2070 # always raise some kind of error. 

2071 if ( 

2072 reproduce_failure := wrapped_test._hypothesis_internal_use_reproduce_failure 

2073 ) is not None: 

2074 expected_version, failure = reproduce_failure 

2075 if expected_version != __version__: 

2076 raise InvalidArgument( 

2077 "Attempting to reproduce a failure from a different " 

2078 f"version of Hypothesis. This failure is from {expected_version}, but " 

2079 f"you are currently running {__version__!r}. Please change your " 

2080 "Hypothesis version to a matching one." 

2081 ) 

2082 try: 

2083 state.execute_once( 

2084 ConjectureData.for_choices(decode_failure(failure)), 

2085 print_example=True, 

2086 is_final=True, 

2087 ) 

2088 raise DidNotReproduce( 

2089 "Expected the test to raise an error, but it " 

2090 "completed successfully." 

2091 ) 

2092 except StopTest: 

2093 raise DidNotReproduce( 

2094 "The shape of the test data has changed in some way " 

2095 "from where this blob was defined. Are you sure " 

2096 "you're running the same test?" 

2097 ) from None 

2098 except UnsatisfiedAssumption: 

2099 raise DidNotReproduce( 

2100 "The test data failed to satisfy an assumption in the " 

2101 "test. Have you added it since this blob was generated?" 

2102 ) from None 

2103 

2104 # There was no @reproduce_failure, so start by running any explicit 

2105 # examples from @example decorators. 

2106 if errors := list( 

2107 execute_explicit_examples( 

2108 state, wrapped_test, arguments, kwargs, original_sig 

2109 ) 

2110 ): 

2111 # If we're not going to report multiple bugs, we would have 

2112 # stopped running explicit examples at the first failure. 

2113 assert len(errors) == 1 or state.settings.report_multiple_bugs 

2114 

2115 # If an explicit example raised a 'skip' exception, ensure it's never 

2116 # wrapped up in an exception group. Because we break out of the loop 

2117 # immediately on finding a skip, if present it's always the last error. 

2118 if isinstance(errors[-1][1], skip_exceptions_to_reraise()): 

2119 # Covered by `test_issue_3453_regression`, just in a subprocess. 

2120 del errors[:-1] # pragma: no cover 

2121 

2122 _raise_to_user(errors, state.settings, [], " in explicit examples") 

2123 

2124 # If there were any explicit examples, they all ran successfully. 

2125 # The next step is to use the Conjecture engine to run the test on 

2126 # many different inputs. 

2127 ran_explicit_examples = ( 

2128 Phase.explicit in state.settings.phases 

2129 and getattr(wrapped_test, "hypothesis_explicit_examples", ()) 

2130 ) 

2131 SKIP_BECAUSE_NO_EXAMPLES = unittest.SkipTest( 

2132 "Hypothesis has been told to run no examples for this test." 

2133 ) 

2134 if not ( 

2135 Phase.reuse in settings.phases or Phase.generate in settings.phases 

2136 ): 

2137 if not ran_explicit_examples: 

2138 raise SKIP_BECAUSE_NO_EXAMPLES 

2139 return 

2140 

2141 try: 

2142 if isinstance(runner, TestCase) and hasattr(runner, "subTest"): 

2143 subTest = runner.subTest 

2144 try: 

2145 runner.subTest = types.MethodType(fake_subTest, runner) 

2146 state.run_engine() 

2147 finally: 

2148 runner.subTest = subTest 

2149 else: 

2150 state.run_engine() 

2151 except BaseException as e: 

2152 # The exception caught here should either be an actual test 

2153 # failure (or BaseExceptionGroup), or some kind of fatal error 

2154 # that caused the engine to stop. 

2155 generated_seed = ( 

2156 wrapped_test._hypothesis_internal_use_generated_seed 

2157 ) 

2158 with local_settings(settings): 

2159 if not (state.failed_normally or generated_seed is None): 

2160 if running_under_pytest: 

2161 report( 

2162 f"You can add @seed({generated_seed}) to this test or " 

2163 f"run pytest with --hypothesis-seed={generated_seed} " 

2164 "to reproduce this failure." 

2165 ) 

2166 else: 

2167 report( 

2168 f"You can add @seed({generated_seed}) to this test to " 

2169 "reproduce this failure." 

2170 ) 

2171 # The dance here is to avoid showing users long tracebacks 

2172 # full of Hypothesis internals they don't care about. 

2173 # We have to do this inline, to avoid adding another 

2174 # internal stack frame just when we've removed the rest. 

2175 # 

2176 # Using a variable for our trimmed error ensures that the line 

2177 # which will actually appear in tracebacks is as clear as 

2178 # possible - "raise the_error_hypothesis_found". 

2179 the_error_hypothesis_found = e.with_traceback( 

2180 None 

2181 if isinstance(e, BaseExceptionGroup) 

2182 else get_trimmed_traceback() 

2183 ) 

2184 raise the_error_hypothesis_found 

2185 

2186 if not (ran_explicit_examples or state.ever_executed): 

2187 raise SKIP_BECAUSE_NO_EXAMPLES 

2188 finally: 

2189 with thread_overlap_lock: 

2190 del thread_overlap[threadid] 

2191 

2192 def _get_fuzz_target() -> ( 

2193 Callable[[bytes | bytearray | memoryview | BinaryIO], bytes | None] 

2194 ): 

2195 # Because fuzzing interfaces are very performance-sensitive, we use a 

2196 # somewhat more complicated structure here. `_get_fuzz_target()` is 

2197 # called by the `HypothesisHandle.fuzz_one_input` property, allowing 

2198 # us to defer our collection of the settings, random instance, and 

2199 # reassignable `inner_test` (etc) until `fuzz_one_input` is accessed. 

2200 # 

2201 # We then share the performance cost of setting up `state` between 

2202 # many invocations of the target. We explicitly force `deadline=None` 

2203 # for performance reasons, saving ~40% the runtime of an empty test. 

2204 test = wrapped_test.hypothesis.inner_test 

2205 settings = Settings( 

2206 parent=wrapped_test._hypothesis_internal_use_settings, deadline=None 

2207 ) 

2208 random = get_random_for_wrapped_test(test, wrapped_test) 

2209 _args, _kwargs, stuff = process_arguments_to_given( 

2210 wrapped_test, (), {}, given_kwargs, new_signature.parameters 

2211 ) 

2212 assert not _args 

2213 assert not _kwargs 

2214 state = StateForActualGivenExecution( 

2215 stuff, 

2216 test, 

2217 settings, 

2218 random, 

2219 wrapped_test, 

2220 thread_overlap=thread_overlap, 

2221 ) 

2222 database_key = function_digest(test) + b".secondary" 

2223 # We track the minimal-so-far example for each distinct origin, so 

2224 # that we track log-n instead of n examples for long runs. In particular 

2225 # it means that we saturate for common errors in long runs instead of 

2226 # storing huge volumes of low-value data. 

2227 minimal_failures: dict = {} 

2228 

2229 def fuzz_one_input( 

2230 buffer: bytes | bytearray | memoryview | BinaryIO, 

2231 ) -> bytes | None: 

2232 # This inner part is all that the fuzzer will actually run, 

2233 # so we keep it as small and as fast as possible. 

2234 if isinstance(buffer, io.IOBase): 

2235 buffer = buffer.read(BUFFER_SIZE) 

2236 assert isinstance(buffer, (bytes, bytearray, memoryview)) 

2237 data = ConjectureData( 

2238 random=None, 

2239 provider=BytestringProvider, 

2240 provider_kw={"bytestring": buffer}, 

2241 ) 

2242 try: 

2243 state.execute_once(data) 

2244 status = Status.VALID 

2245 except StopTest: 

2246 status = data.status 

2247 return None 

2248 except UnsatisfiedAssumption: 

2249 status = Status.INVALID 

2250 return None 

2251 except BaseException: 

2252 known = minimal_failures.get(data.interesting_origin) 

2253 if settings.database is not None and ( 

2254 known is None or sort_key(data.nodes) <= sort_key(known) 

2255 ): 

2256 settings.database.save( 

2257 database_key, choices_to_bytes(data.choices) 

2258 ) 

2259 minimal_failures[data.interesting_origin] = data.nodes 

2260 status = Status.INTERESTING 

2261 raise 

2262 finally: 

2263 if observability_enabled(): 

2264 data.freeze() 

2265 tc = make_testcase( 

2266 run_start=state._start_timestamp, 

2267 property=state.test_identifier, 

2268 data=data, 

2269 how_generated="fuzz_one_input", 

2270 representation=state._string_repr, 

2271 arguments=data._observability_args, 

2272 timing=state._timing_features, 

2273 coverage=None, 

2274 status=status, 

2275 backend_metadata=data.provider.observe_test_case(), 

2276 ) 

2277 deliver_observation(tc) 

2278 state._timing_features = {} 

2279 

2280 assert isinstance(data.provider, BytestringProvider) 

2281 return bytes(data.provider.drawn) 

2282 

2283 fuzz_one_input.__doc__ = HypothesisHandle.fuzz_one_input.__doc__ 

2284 return fuzz_one_input 

2285 

2286 # After having created the decorated test function, we need to copy 

2287 # over some attributes to make the switch as seamless as possible. 

2288 

2289 for attrib in dir(test): 

2290 if not (attrib.startswith("_") or hasattr(wrapped_test, attrib)): 

2291 setattr(wrapped_test, attrib, getattr(test, attrib)) 

2292 wrapped_test.is_hypothesis_test = True 

2293 if hasattr(test, "_hypothesis_internal_settings_applied"): 

2294 # Used to check if @settings is applied twice. 

2295 wrapped_test._hypothesis_internal_settings_applied = True 

2296 wrapped_test._hypothesis_internal_use_seed = getattr( 

2297 test, "_hypothesis_internal_use_seed", None 

2298 ) 

2299 wrapped_test._hypothesis_internal_use_settings = ( 

2300 getattr(test, "_hypothesis_internal_use_settings", None) or Settings.default 

2301 ) 

2302 wrapped_test._hypothesis_internal_use_reproduce_failure = getattr( 

2303 test, "_hypothesis_internal_use_reproduce_failure", None 

2304 ) 

2305 wrapped_test.hypothesis = HypothesisHandle(test, _get_fuzz_target, given_kwargs) 

2306 return wrapped_test 

2307 

2308 return run_test_as_given 

2309 

2310 

2311def find( 

2312 specifier: SearchStrategy[Ex], 

2313 condition: Callable[[Any], bool], 

2314 *, 

2315 settings: Settings | None = None, 

2316 random: Random | None = None, 

2317 database_key: bytes | None = None, 

2318) -> Ex: 

2319 """Returns the minimal example from the given strategy ``specifier`` that 

2320 matches the predicate function ``condition``.""" 

2321 if settings is None: 

2322 settings = Settings(max_examples=2000) 

2323 settings = Settings( 

2324 settings, suppress_health_check=list(HealthCheck), report_multiple_bugs=False 

2325 ) 

2326 

2327 if database_key is None and settings.database is not None: 

2328 # Note: The database key is not guaranteed to be unique. If not, replaying 

2329 # of database examples may fail to reproduce due to being replayed on the 

2330 # wrong condition. 

2331 database_key = function_digest(condition) 

2332 

2333 if not isinstance(specifier, SearchStrategy): 

2334 raise InvalidArgument( 

2335 f"Expected SearchStrategy but got {specifier!r} of " 

2336 f"type {type(specifier).__name__}" 

2337 ) 

2338 specifier.validate() 

2339 

2340 last: list[Ex] = [] 

2341 

2342 @settings 

2343 @given(specifier) 

2344 def test(v): 

2345 if condition(v): 

2346 last[:] = [v] 

2347 raise Found 

2348 

2349 if random is not None: 

2350 test = seed(random.getrandbits(64))(test) 

2351 

2352 test._hypothesis_internal_database_key = database_key # type: ignore 

2353 

2354 try: 

2355 test() 

2356 except Found: 

2357 return last[0] 

2358 

2359 raise NoSuchExample(get_pretty_function_description(condition))