Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/control.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

145 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import inspect 

12import math 

13import random 

14from collections import defaultdict 

15from collections.abc import Sequence 

16from contextlib import contextmanager 

17from typing import Any, Callable, NoReturn, Optional, Union 

18from weakref import WeakKeyDictionary 

19 

20from hypothesis import Verbosity, settings 

21from hypothesis._settings import note_deprecation 

22from hypothesis.errors import InvalidArgument, UnsatisfiedAssumption 

23from hypothesis.internal.compat import BaseExceptionGroup 

24from hypothesis.internal.conjecture.data import ConjectureData 

25from hypothesis.internal.observability import observability_enabled 

26from hypothesis.internal.reflection import get_pretty_function_description 

27from hypothesis.internal.validation import check_type 

28from hypothesis.reporting import report, verbose_report 

29from hypothesis.utils.dynamicvariables import DynamicVariable 

30from hypothesis.vendor.pretty import IDKey, PrettyPrintFunction, pretty 

31 

32 

33def _calling_function_location(what: str, frame: Any) -> str: 

34 where = frame.f_back 

35 return f"{what}() in {where.f_code.co_name} (line {where.f_lineno})" 

36 

37 

38def reject() -> NoReturn: 

39 if _current_build_context.value is None: 

40 note_deprecation( 

41 "Using `reject` outside a property-based test is deprecated", 

42 since="2023-09-25", 

43 has_codemod=False, 

44 ) 

45 where = _calling_function_location("reject", inspect.currentframe()) 

46 if currently_in_test_context(): 

47 counts = current_build_context().data._observability_predicates[where] 

48 counts.update_count(condition=False) 

49 raise UnsatisfiedAssumption(where) 

50 

51 

52def assume(condition: object) -> bool: 

53 """Calling ``assume`` is like an :ref:`assert <python:assert>` that marks 

54 the example as bad, rather than failing the test. 

55 

56 This allows you to specify properties that you *assume* will be 

57 true, and let Hypothesis try to avoid similar examples in future. 

58 """ 

59 if _current_build_context.value is None: 

60 note_deprecation( 

61 "Using `assume` outside a property-based test is deprecated", 

62 since="2023-09-25", 

63 has_codemod=False, 

64 ) 

65 if observability_enabled() or not condition: 

66 where = _calling_function_location("assume", inspect.currentframe()) 

67 if observability_enabled() and currently_in_test_context(): 

68 counts = current_build_context().data._observability_predicates[where] 

69 counts.update_count(condition=bool(condition)) 

70 if not condition: 

71 raise UnsatisfiedAssumption(f"failed to satisfy {where}") 

72 return True 

73 

74 

75_current_build_context = DynamicVariable[Optional["BuildContext"]](None) 

76 

77 

78def currently_in_test_context() -> bool: 

79 """Return ``True`` if the calling code is currently running inside an 

80 |@given| or :ref:`stateful <stateful>` test, and ``False`` otherwise. 

81 

82 This is useful for third-party integrations and assertion helpers which 

83 may be called from either traditional or property-based tests, and can only 

84 use e.g. |assume| or |target| in the latter case. 

85 """ 

86 return _current_build_context.value is not None 

87 

88 

89def current_build_context() -> "BuildContext": 

90 context = _current_build_context.value 

91 if context is None: 

92 raise InvalidArgument("No build context registered") 

93 return context 

94 

95 

96@contextmanager 

97def deprecate_random_in_strategy(fmt, *args): 

98 from hypothesis.internal import entropy 

99 

100 state_before = random.getstate() 

101 yield 

102 state_after = random.getstate() 

103 if ( 

104 # there is a threading race condition here with deterministic_PRNG. Say 

105 # we have two threads 1 and 2. We start in global random state A, and 

106 # deterministic_PRNG sets to global random state B (which is constant across 

107 # threads since we seed to 0 unconditionally). Then we might have state 

108 # transitions: 

109 # 

110 # [1] [2] 

111 # A -> B deterministic_PRNG().__enter__ 

112 # B ->B deterministic_PRNG().__enter__ 

113 # state_before = B deprecate_random_in_strategy.__enter__ 

114 # B -> A deterministic_PRNG().__exit__ 

115 # state_after = A deprecate_random_in_strategy.__exit__ 

116 # 

117 # where state_before != state_after because a different thread has reset 

118 # the global random state. 

119 # 

120 # To fix this, we track the known random states set by deterministic_PRNG, 

121 # and will not note a deprecation if it matches one of those. 

122 state_after != state_before 

123 and hash(state_after) not in entropy._known_random_state_hashes 

124 ): 

125 note_deprecation( 

126 "Do not use the `random` module inside strategies; instead " 

127 "consider `st.randoms()`, `st.sampled_from()`, etc. " + fmt.format(*args), 

128 since="2024-02-05", 

129 has_codemod=False, 

130 stacklevel=1, 

131 ) 

132 

133 

134class BuildContext: 

135 def __init__( 

136 self, 

137 data: ConjectureData, 

138 *, 

139 is_final: bool = False, 

140 wrapped_test: Callable, 

141 ) -> None: 

142 self.data = data 

143 self.tasks: list[Callable[[], Any]] = [] 

144 self.is_final = is_final 

145 self.wrapped_test = wrapped_test 

146 

147 # Use defaultdict(list) here to handle the possibility of having multiple 

148 # functions registered for the same object (due to caching, small ints, etc). 

149 # The printer will discard duplicates which return different representations. 

150 self.known_object_printers: dict[IDKey, list[PrettyPrintFunction]] = ( 

151 defaultdict(list) 

152 ) 

153 

154 def record_call( 

155 self, 

156 obj: object, 

157 func: object, 

158 *, 

159 args: Sequence[object], 

160 kwargs: dict[str, object], 

161 ) -> None: 

162 self.known_object_printers[IDKey(obj)].append( 

163 # _func=func prevents mypy from inferring lambda type. Would need 

164 # paramspec I think - not worth it. 

165 lambda obj, p, cycle, *, _func=func: p.maybe_repr_known_object_as_call( # type: ignore 

166 obj, cycle, get_pretty_function_description(_func), args, kwargs 

167 ) 

168 ) 

169 

170 def prep_args_kwargs_from_strategies(self, kwarg_strategies): 

171 arg_labels = {} 

172 kwargs = {} 

173 for k, s in kwarg_strategies.items(): 

174 start_idx = len(self.data.nodes) 

175 with deprecate_random_in_strategy("from {}={!r}", k, s): 

176 obj = self.data.draw(s, observe_as=f"generate:{k}") 

177 end_idx = len(self.data.nodes) 

178 kwargs[k] = obj 

179 

180 # This high up the stack, we can't see or really do much with the conjecture 

181 # Example objects - not least because they're only materialized after the 

182 # test case is completed. Instead, we'll stash the (start_idx, end_idx) 

183 # pair on our data object for the ConjectureRunner engine to deal with, and 

184 # pass a dict of such out so that the pretty-printer knows where to place 

185 # the which-parts-matter comments later. 

186 if start_idx != end_idx: 

187 arg_labels[k] = (start_idx, end_idx) 

188 self.data.arg_slices.add((start_idx, end_idx)) 

189 

190 return kwargs, arg_labels 

191 

192 def __enter__(self): 

193 self.assign_variable = _current_build_context.with_value(self) 

194 self.assign_variable.__enter__() 

195 return self 

196 

197 def __exit__(self, exc_type, exc_value, tb): 

198 self.assign_variable.__exit__(exc_type, exc_value, tb) 

199 errors = [] 

200 for task in self.tasks: 

201 try: 

202 task() 

203 except BaseException as err: 

204 errors.append(err) 

205 if errors: 

206 if len(errors) == 1: 

207 raise errors[0] from exc_value 

208 raise BaseExceptionGroup("Cleanup failed", errors) from exc_value 

209 

210 

211def cleanup(teardown): 

212 """Register a function to be called when the current test has finished 

213 executing. Any exceptions thrown in teardown will be printed but not 

214 rethrown. 

215 

216 Inside a test this isn't very interesting, because you can just use 

217 a finally block, but note that you can use this inside map, flatmap, 

218 etc. in order to e.g. insist that a value is closed at the end. 

219 """ 

220 context = _current_build_context.value 

221 if context is None: 

222 raise InvalidArgument("Cannot register cleanup outside of build context") 

223 context.tasks.append(teardown) 

224 

225 

226def should_note(): 

227 context = _current_build_context.value 

228 if context is None: 

229 raise InvalidArgument("Cannot make notes outside of a test") 

230 return context.is_final or settings.default.verbosity >= Verbosity.verbose 

231 

232 

233def note(value: object) -> None: 

234 """Report this value for the minimal failing example.""" 

235 if should_note(): 

236 if not isinstance(value, str): 

237 value = pretty(value) 

238 report(value) 

239 

240 

241def event(value: str, payload: Union[str, int, float] = "") -> None: 

242 """Record an event that occurred during this test. Statistics on the number of test 

243 runs with each event will be reported at the end if you run Hypothesis in 

244 statistics reporting mode. 

245 

246 Event values should be strings or convertible to them. If an optional 

247 payload is given, it will be included in the string for :ref:`statistics`. 

248 """ 

249 context = _current_build_context.value 

250 if context is None: 

251 raise InvalidArgument("Cannot make record events outside of a test") 

252 

253 payload = _event_to_string(payload, (str, int, float)) 

254 context.data.events[_event_to_string(value)] = payload 

255 

256 

257_events_to_strings: WeakKeyDictionary = WeakKeyDictionary() 

258 

259 

260def _event_to_string(event, allowed_types=str): 

261 if isinstance(event, allowed_types): 

262 return event 

263 try: 

264 return _events_to_strings[event] 

265 except (KeyError, TypeError): 

266 pass 

267 result = str(event) 

268 try: 

269 _events_to_strings[event] = result 

270 except TypeError: 

271 pass 

272 return result 

273 

274 

275def target(observation: Union[int, float], *, label: str = "") -> Union[int, float]: 

276 """Calling this function with an ``int`` or ``float`` observation gives it feedback 

277 with which to guide our search for inputs that will cause an error, in 

278 addition to all the usual heuristics. Observations must always be finite. 

279 

280 Hypothesis will try to maximize the observed value over several examples; 

281 almost any metric will work so long as it makes sense to increase it. 

282 For example, ``-abs(error)`` is a metric that increases as ``error`` 

283 approaches zero. 

284 

285 Example metrics: 

286 

287 - Number of elements in a collection, or tasks in a queue 

288 - Mean or maximum runtime of a task (or both, if you use ``label``) 

289 - Compression ratio for data (perhaps per-algorithm or per-level) 

290 - Number of steps taken by a state machine 

291 

292 The optional ``label`` argument can be used to distinguish between 

293 and therefore separately optimise distinct observations, such as the 

294 mean and standard deviation of a dataset. It is an error to call 

295 ``target()`` with any label more than once per test case. 

296 

297 .. note:: 

298 The more examples you run, the better this technique works. 

299 

300 As a rule of thumb, the targeting effect is noticeable above 

301 :obj:`max_examples=1000 <hypothesis.settings.max_examples>`, 

302 and immediately obvious by around ten thousand examples 

303 *per label* used by your test. 

304 

305 :ref:`statistics` include the best score seen for each label, 

306 which can help avoid `the threshold problem 

307 <https://hypothesis.works/articles/threshold-problem/>`__ when the minimal 

308 example shrinks right down to the threshold of failure (:issue:`2180`). 

309 """ 

310 check_type((int, float), observation, "observation") 

311 if not math.isfinite(observation): 

312 raise InvalidArgument(f"{observation=} must be a finite float.") 

313 check_type(str, label, "label") 

314 

315 context = _current_build_context.value 

316 if context is None: 

317 raise InvalidArgument( 

318 "Calling target() outside of a test is invalid. " 

319 "Consider guarding this call with `if currently_in_test_context(): ...`" 

320 ) 

321 elif context.data.provider.avoid_realization: 

322 # We could in principle realize this in the engine, but it seems more 

323 # efficient to have our alternative backend optimize it for us. 

324 # See e.g. https://github.com/pschanely/hypothesis-crosshair/issues/3 

325 return observation # pragma: no cover 

326 verbose_report(f"Saw target({observation!r}, {label=})") 

327 

328 if label in context.data.target_observations: 

329 raise InvalidArgument( 

330 f"Calling target({observation!r}, {label=}) would overwrite " 

331 f"target({context.data.target_observations[label]!r}, {label=})" 

332 ) 

333 else: 

334 context.data.target_observations[label] = observation 

335 

336 return observation