Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/control.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

166 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import inspect 

12import math 

13import random 

14from collections import defaultdict 

15from collections.abc import Callable, Generator, Sequence 

16from contextlib import contextmanager 

17from types import TracebackType 

18from typing import TYPE_CHECKING, Any, Literal, NoReturn, Optional, overload 

19from weakref import WeakKeyDictionary 

20 

21from hypothesis import Verbosity, settings 

22from hypothesis.errors import InvalidArgument, UnsatisfiedAssumption 

23from hypothesis.internal.compat import BaseExceptionGroup 

24from hypothesis.internal.conjecture.data import ConjectureData 

25from hypothesis.internal.observability import observability_enabled 

26from hypothesis.internal.reflection import get_pretty_function_description 

27from hypothesis.internal.validation import check_type 

28from hypothesis.reporting import report, verbose_report 

29from hypothesis.utils.deprecation import note_deprecation 

30from hypothesis.utils.dynamicvariables import DynamicVariable 

31from hypothesis.vendor.pretty import ArgLabelsT, IDKey, PrettyPrintFunction, pretty 

32 

33if TYPE_CHECKING: 

34 from typing_extensions import Self 

35 

36 

37def _calling_function_location(what: str, frame: Any) -> str: 

38 where = frame.f_back 

39 return f"{what}() in {where.f_code.co_name} (line {where.f_lineno})" 

40 

41 

42def reject() -> NoReturn: 

43 if _current_build_context.value is None: 

44 note_deprecation( 

45 "Using `reject` outside a property-based test is deprecated", 

46 since="2023-09-25", 

47 has_codemod=False, 

48 ) 

49 where = _calling_function_location("reject", inspect.currentframe()) 

50 if currently_in_test_context(): 

51 counts = current_build_context().data._observability_predicates[where] 

52 counts.update_count(condition=False) 

53 raise UnsatisfiedAssumption(where) 

54 

55 

56@overload 

57def assume(condition: Literal[False] | None) -> NoReturn: ... 

58@overload 

59def assume(condition: object) -> Literal[True]: ... 

60 

61 

62def assume(condition: object) -> Literal[True]: 

63 """Calling ``assume`` is like an :ref:`assert <python:assert>` that marks 

64 the example as bad, rather than failing the test. 

65 

66 This allows you to specify properties that you *assume* will be 

67 true, and let Hypothesis try to avoid similar examples in future. 

68 """ 

69 if _current_build_context.value is None: 

70 note_deprecation( 

71 "Using `assume` outside a property-based test is deprecated", 

72 since="2023-09-25", 

73 has_codemod=False, 

74 ) 

75 if observability_enabled() or not condition: 

76 where = _calling_function_location("assume", inspect.currentframe()) 

77 if observability_enabled() and currently_in_test_context(): 

78 counts = current_build_context().data._observability_predicates[where] 

79 counts.update_count(condition=bool(condition)) 

80 if not condition: 

81 raise UnsatisfiedAssumption(f"failed to satisfy {where}") 

82 return True 

83 

84 

85_current_build_context = DynamicVariable[Optional["BuildContext"]](None) 

86 

87 

88def currently_in_test_context() -> bool: 

89 """Return ``True`` if the calling code is currently running inside an 

90 |@given| or :ref:`stateful <stateful>` test, and ``False`` otherwise. 

91 

92 This is useful for third-party integrations and assertion helpers which 

93 may be called from either traditional or property-based tests, and can only 

94 use e.g. |assume| or |target| in the latter case. 

95 """ 

96 return _current_build_context.value is not None 

97 

98 

99def current_build_context() -> "BuildContext": 

100 context = _current_build_context.value 

101 if context is None: 

102 raise InvalidArgument("No build context registered") 

103 return context 

104 

105 

106@contextmanager 

107def deprecate_random_in_strategy(fmt: str, *args: Any) -> Generator[None, None, None]: 

108 from hypothesis.internal import entropy 

109 

110 state_before = random.getstate() 

111 yield 

112 state_after = random.getstate() 

113 if ( 

114 # there is a threading race condition here with deterministic_PRNG. Say 

115 # we have two threads 1 and 2. We start in global random state A, and 

116 # deterministic_PRNG sets to global random state B (which is constant across 

117 # threads since we seed to 0 unconditionally). Then we might have state 

118 # transitions: 

119 # 

120 # [1] [2] 

121 # A -> B deterministic_PRNG().__enter__ 

122 # B ->B deterministic_PRNG().__enter__ 

123 # state_before = B deprecate_random_in_strategy.__enter__ 

124 # B -> A deterministic_PRNG().__exit__ 

125 # state_after = A deprecate_random_in_strategy.__exit__ 

126 # 

127 # where state_before != state_after because a different thread has reset 

128 # the global random state. 

129 # 

130 # To fix this, we track the known random states set by deterministic_PRNG, 

131 # and will not note a deprecation if it matches one of those. 

132 state_after != state_before 

133 and hash(state_after) not in entropy._known_random_state_hashes 

134 ): 

135 note_deprecation( 

136 "Do not use the `random` module inside strategies; instead " 

137 "consider `st.randoms()`, `st.sampled_from()`, etc. " + fmt.format(*args), 

138 since="2024-02-05", 

139 has_codemod=False, 

140 stacklevel=1, 

141 ) 

142 

143 

144class BuildContext: 

145 def __init__( 

146 self, 

147 data: ConjectureData, 

148 *, 

149 is_final: bool = False, 

150 wrapped_test: Callable, 

151 ) -> None: 

152 self.data = data 

153 self.tasks: list[Callable[[], Any]] = [] 

154 self.is_final = is_final 

155 self.wrapped_test = wrapped_test 

156 

157 # Use defaultdict(list) here to handle the possibility of having multiple 

158 # functions registered for the same object (due to caching, small ints, etc). 

159 # The printer will discard duplicates which return different representations. 

160 self.known_object_printers: dict[IDKey, list[PrettyPrintFunction]] = ( 

161 defaultdict(list) 

162 ) 

163 

164 # Track nested strategy calls for explain-phase label paths 

165 self._label_path: list[str] = [] 

166 

167 @contextmanager 

168 def track_arg_label(self, label: str) -> Generator[ArgLabelsT, None, None]: 

169 start = len(self.data.nodes) 

170 self._label_path.append(label) 

171 arg_labels: ArgLabelsT = {} 

172 try: 

173 yield arg_labels 

174 finally: 

175 self._label_path.pop() 

176 

177 # This high up the stack, we can't see or really do much with 

178 # Span / SpanRecord - not least because they're only materialized 

179 # after the test case is completed. 

180 # 

181 # Instead, we'll stash the (start_idx, end_idx) pair on our data object 

182 # for the ConjectureRunner engine to deal with, and mutate the arg_labels 

183 # dict so that the pretty-printer knows where to place the 

184 # which-parts-matter comments later. 

185 end = len(self.data.nodes) 

186 assert start <= end 

187 if start != end: 

188 arg_labels[label] = (start, end) 

189 self.data.arg_slices.add((start, end)) 

190 

191 def record_call( 

192 self, 

193 obj: object, 

194 func: object, 

195 *, 

196 args: Sequence[object], 

197 kwargs: dict[str, object], 

198 arg_labels: ArgLabelsT | None = None, 

199 ) -> None: 

200 self.known_object_printers[IDKey(obj)].append( 

201 lambda obj, p, cycle, *, _func=func, _arg_labels=arg_labels: p.maybe_repr_known_object_as_call( # type: ignore 

202 obj, 

203 cycle, 

204 get_pretty_function_description(_func), 

205 args, 

206 kwargs, 

207 arg_labels=_arg_labels, 

208 ) 

209 ) 

210 

211 def prep_args_kwargs_from_strategies( 

212 self, 

213 kwarg_strategies: dict[str, Any], 

214 ) -> tuple[dict[str, Any], ArgLabelsT]: 

215 arg_labels: ArgLabelsT = {} 

216 kwargs: dict[str, Any] = {} 

217 

218 for k, s in kwarg_strategies.items(): 

219 with ( 

220 self.track_arg_label(k) as arg_label, 

221 deprecate_random_in_strategy("from {}={!r}", k, s), 

222 ): 

223 kwargs[k] = self.data.draw(s, observe_as=f"generate:{k}") 

224 arg_labels |= arg_label 

225 

226 return kwargs, arg_labels 

227 

228 def __enter__(self) -> "Self": 

229 self.assign_variable = _current_build_context.with_value(self) 

230 self.assign_variable.__enter__() 

231 return self 

232 

233 def __exit__( 

234 self, 

235 exc_type: type[BaseException] | None, 

236 exc_value: BaseException | None, 

237 tb: TracebackType | None, 

238 ) -> None: 

239 self.assign_variable.__exit__(exc_type, exc_value, tb) 

240 errors = [] 

241 for task in self.tasks: 

242 try: 

243 task() 

244 except BaseException as err: 

245 errors.append(err) 

246 if errors: 

247 if len(errors) == 1: 

248 raise errors[0] from exc_value 

249 raise BaseExceptionGroup("Cleanup failed", errors) from exc_value 

250 

251 

252def cleanup(teardown: Callable[[], Any]) -> None: 

253 """Register a function to be called when the current test has finished 

254 executing. Any exceptions thrown in teardown will be printed but not 

255 rethrown. 

256 

257 Inside a test this isn't very interesting, because you can just use 

258 a finally block, but note that you can use this inside map, flatmap, 

259 etc. in order to e.g. insist that a value is closed at the end. 

260 """ 

261 context = _current_build_context.value 

262 if context is None: 

263 raise InvalidArgument("Cannot register cleanup outside of build context") 

264 context.tasks.append(teardown) 

265 

266 

267def should_note() -> bool: 

268 context = _current_build_context.value 

269 if context is None: 

270 raise InvalidArgument("Cannot make notes outside of a test") 

271 assert settings.default is not None 

272 return context.is_final or settings.default.verbosity >= Verbosity.verbose 

273 

274 

275def note(value: object) -> None: 

276 """Report this value for the minimal failing example.""" 

277 if should_note(): 

278 if not isinstance(value, str): 

279 value = pretty(value) 

280 report(value) 

281 

282 

283def event(value: str, payload: Any = "") -> None: 

284 """Record an event that occurred during this test. Statistics on the number of test 

285 runs with each event will be reported at the end if you run Hypothesis in 

286 statistics reporting mode. 

287 

288 Event values should be strings or convertible to them. If an optional 

289 payload is given, it will be included in the string for :ref:`statistics`. 

290 """ 

291 context = _current_build_context.value 

292 if context is None: 

293 raise InvalidArgument("Cannot record events outside of a test") 

294 

295 avoid_realization = context.data.provider.avoid_realization 

296 payload = _serialize_event( 

297 payload, allowed_types=(str, int, float), avoid_realization=avoid_realization 

298 ) 

299 value = _serialize_event(value, avoid_realization=avoid_realization) 

300 context.data.events[value] = payload 

301 

302 

303_events_to_strings: WeakKeyDictionary[Any, str] = WeakKeyDictionary() 

304 

305 

306def _serialize_event( 

307 event: Any, *, allowed_types: tuple[type, ...] = (str,), avoid_realization: bool 

308) -> Any: 

309 if isinstance(event, allowed_types): 

310 return event 

311 

312 # _events_to_strings is a cache which persists across iterations, causing 

313 # problems for symbolic backends. see 

314 # https://github.com/pschanely/hypothesis-crosshair/issues/41 

315 if avoid_realization: 

316 return str(event) 

317 

318 try: 

319 return _events_to_strings[event] 

320 except (KeyError, TypeError): 

321 pass 

322 

323 result = str(event) 

324 try: 

325 _events_to_strings[event] = result 

326 except TypeError: 

327 pass 

328 return result 

329 

330 

331def target(observation: int | float, *, label: str = "") -> int | float: 

332 """Calling this function with an ``int`` or ``float`` observation gives it feedback 

333 with which to guide our search for inputs that will cause an error, in 

334 addition to all the usual heuristics. Observations must always be finite. 

335 

336 Hypothesis will try to maximize the observed value over several examples; 

337 almost any metric will work so long as it makes sense to increase it. 

338 For example, ``-abs(error)`` is a metric that increases as ``error`` 

339 approaches zero. 

340 

341 Example metrics: 

342 

343 - Number of elements in a collection, or tasks in a queue 

344 - Mean or maximum runtime of a task (or both, if you use ``label``) 

345 - Compression ratio for data (perhaps per-algorithm or per-level) 

346 - Number of steps taken by a state machine 

347 

348 The optional ``label`` argument can be used to distinguish between 

349 and therefore separately optimise distinct observations, such as the 

350 mean and standard deviation of a dataset. It is an error to call 

351 ``target()`` with any label more than once per test case. 

352 

353 .. note:: 

354 The more examples you run, the better this technique works. 

355 

356 As a rule of thumb, the targeting effect is noticeable above 

357 :obj:`max_examples=1000 <hypothesis.settings.max_examples>`, 

358 and immediately obvious by around ten thousand examples 

359 *per label* used by your test. 

360 

361 :ref:`statistics` include the best score seen for each label, 

362 which can help avoid `the threshold problem 

363 <https://hypothesis.works/articles/threshold-problem/>`__ when the minimal 

364 example shrinks right down to the threshold of failure (:issue:`2180`). 

365 """ 

366 check_type((int, float), observation, "observation") 

367 if not math.isfinite(observation): 

368 raise InvalidArgument(f"{observation=} must be a finite float.") 

369 check_type(str, label, "label") 

370 

371 context = _current_build_context.value 

372 if context is None: 

373 raise InvalidArgument( 

374 "Calling target() outside of a test is invalid. " 

375 "Consider guarding this call with `if currently_in_test_context(): ...`" 

376 ) 

377 elif context.data.provider.avoid_realization: 

378 # We could in principle realize this in the engine, but it seems more 

379 # efficient to have our alternative backend optimize it for us. 

380 # See e.g. https://github.com/pschanely/hypothesis-crosshair/issues/3 

381 return observation # pragma: no cover 

382 verbose_report(f"Saw target({observation!r}, {label=})") 

383 

384 if label in context.data.target_observations: 

385 raise InvalidArgument( 

386 f"Calling target({observation!r}, {label=}) would overwrite " 

387 f"target({context.data.target_observations[label]!r}, {label=})" 

388 ) 

389 else: 

390 context.data.target_observations[label] = observation 

391 

392 return observation