Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/control.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

153 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import inspect 

12import math 

13import random 

14from collections import defaultdict 

15from collections.abc import Callable, Sequence 

16from contextlib import contextmanager 

17from typing import Any, Literal, NoReturn, Optional, overload 

18from weakref import WeakKeyDictionary 

19 

20from hypothesis import Verbosity, settings 

21from hypothesis._settings import note_deprecation 

22from hypothesis.errors import InvalidArgument, UnsatisfiedAssumption 

23from hypothesis.internal.compat import BaseExceptionGroup 

24from hypothesis.internal.conjecture.data import ConjectureData 

25from hypothesis.internal.observability import observability_enabled 

26from hypothesis.internal.reflection import get_pretty_function_description 

27from hypothesis.internal.validation import check_type 

28from hypothesis.reporting import report, verbose_report 

29from hypothesis.utils.dynamicvariables import DynamicVariable 

30from hypothesis.vendor.pretty import IDKey, PrettyPrintFunction, pretty 

31 

32 

33def _calling_function_location(what: str, frame: Any) -> str: 

34 where = frame.f_back 

35 return f"{what}() in {where.f_code.co_name} (line {where.f_lineno})" 

36 

37 

38def reject() -> NoReturn: 

39 if _current_build_context.value is None: 

40 note_deprecation( 

41 "Using `reject` outside a property-based test is deprecated", 

42 since="2023-09-25", 

43 has_codemod=False, 

44 ) 

45 where = _calling_function_location("reject", inspect.currentframe()) 

46 if currently_in_test_context(): 

47 counts = current_build_context().data._observability_predicates[where] 

48 counts.update_count(condition=False) 

49 raise UnsatisfiedAssumption(where) 

50 

51 

52@overload 

53def assume(condition: Literal[False] | None) -> NoReturn: ... 

54@overload 

55def assume(condition: object) -> Literal[True]: ... 

56 

57 

58def assume(condition: object) -> Literal[True]: 

59 """Calling ``assume`` is like an :ref:`assert <python:assert>` that marks 

60 the example as bad, rather than failing the test. 

61 

62 This allows you to specify properties that you *assume* will be 

63 true, and let Hypothesis try to avoid similar examples in future. 

64 """ 

65 if _current_build_context.value is None: 

66 note_deprecation( 

67 "Using `assume` outside a property-based test is deprecated", 

68 since="2023-09-25", 

69 has_codemod=False, 

70 ) 

71 if observability_enabled() or not condition: 

72 where = _calling_function_location("assume", inspect.currentframe()) 

73 if observability_enabled() and currently_in_test_context(): 

74 counts = current_build_context().data._observability_predicates[where] 

75 counts.update_count(condition=bool(condition)) 

76 if not condition: 

77 raise UnsatisfiedAssumption(f"failed to satisfy {where}") 

78 return True 

79 

80 

81_current_build_context = DynamicVariable[Optional["BuildContext"]](None) 

82 

83 

84def currently_in_test_context() -> bool: 

85 """Return ``True`` if the calling code is currently running inside an 

86 |@given| or :ref:`stateful <stateful>` test, and ``False`` otherwise. 

87 

88 This is useful for third-party integrations and assertion helpers which 

89 may be called from either traditional or property-based tests, and can only 

90 use e.g. |assume| or |target| in the latter case. 

91 """ 

92 return _current_build_context.value is not None 

93 

94 

95def current_build_context() -> "BuildContext": 

96 context = _current_build_context.value 

97 if context is None: 

98 raise InvalidArgument("No build context registered") 

99 return context 

100 

101 

102@contextmanager 

103def deprecate_random_in_strategy(fmt, *args): 

104 from hypothesis.internal import entropy 

105 

106 state_before = random.getstate() 

107 yield 

108 state_after = random.getstate() 

109 if ( 

110 # there is a threading race condition here with deterministic_PRNG. Say 

111 # we have two threads 1 and 2. We start in global random state A, and 

112 # deterministic_PRNG sets to global random state B (which is constant across 

113 # threads since we seed to 0 unconditionally). Then we might have state 

114 # transitions: 

115 # 

116 # [1] [2] 

117 # A -> B deterministic_PRNG().__enter__ 

118 # B ->B deterministic_PRNG().__enter__ 

119 # state_before = B deprecate_random_in_strategy.__enter__ 

120 # B -> A deterministic_PRNG().__exit__ 

121 # state_after = A deprecate_random_in_strategy.__exit__ 

122 # 

123 # where state_before != state_after because a different thread has reset 

124 # the global random state. 

125 # 

126 # To fix this, we track the known random states set by deterministic_PRNG, 

127 # and will not note a deprecation if it matches one of those. 

128 state_after != state_before 

129 and hash(state_after) not in entropy._known_random_state_hashes 

130 ): 

131 note_deprecation( 

132 "Do not use the `random` module inside strategies; instead " 

133 "consider `st.randoms()`, `st.sampled_from()`, etc. " + fmt.format(*args), 

134 since="2024-02-05", 

135 has_codemod=False, 

136 stacklevel=1, 

137 ) 

138 

139 

140class BuildContext: 

141 def __init__( 

142 self, 

143 data: ConjectureData, 

144 *, 

145 is_final: bool = False, 

146 wrapped_test: Callable, 

147 ) -> None: 

148 self.data = data 

149 self.tasks: list[Callable[[], Any]] = [] 

150 self.is_final = is_final 

151 self.wrapped_test = wrapped_test 

152 

153 # Use defaultdict(list) here to handle the possibility of having multiple 

154 # functions registered for the same object (due to caching, small ints, etc). 

155 # The printer will discard duplicates which return different representations. 

156 self.known_object_printers: dict[IDKey, list[PrettyPrintFunction]] = ( 

157 defaultdict(list) 

158 ) 

159 

160 def record_call( 

161 self, 

162 obj: object, 

163 func: object, 

164 *, 

165 args: Sequence[object], 

166 kwargs: dict[str, object], 

167 ) -> None: 

168 self.known_object_printers[IDKey(obj)].append( 

169 # _func=func prevents mypy from inferring lambda type. Would need 

170 # paramspec I think - not worth it. 

171 lambda obj, p, cycle, *, _func=func: p.maybe_repr_known_object_as_call( # type: ignore 

172 obj, cycle, get_pretty_function_description(_func), args, kwargs 

173 ) 

174 ) 

175 

176 def prep_args_kwargs_from_strategies(self, kwarg_strategies): 

177 arg_labels = {} 

178 kwargs = {} 

179 for k, s in kwarg_strategies.items(): 

180 start_idx = len(self.data.nodes) 

181 with deprecate_random_in_strategy("from {}={!r}", k, s): 

182 obj = self.data.draw(s, observe_as=f"generate:{k}") 

183 end_idx = len(self.data.nodes) 

184 kwargs[k] = obj 

185 

186 # This high up the stack, we can't see or really do much with the conjecture 

187 # Example objects - not least because they're only materialized after the 

188 # test case is completed. Instead, we'll stash the (start_idx, end_idx) 

189 # pair on our data object for the ConjectureRunner engine to deal with, and 

190 # pass a dict of such out so that the pretty-printer knows where to place 

191 # the which-parts-matter comments later. 

192 if start_idx != end_idx: 

193 arg_labels[k] = (start_idx, end_idx) 

194 self.data.arg_slices.add((start_idx, end_idx)) 

195 

196 return kwargs, arg_labels 

197 

198 def __enter__(self): 

199 self.assign_variable = _current_build_context.with_value(self) 

200 self.assign_variable.__enter__() 

201 return self 

202 

203 def __exit__(self, exc_type, exc_value, tb): 

204 self.assign_variable.__exit__(exc_type, exc_value, tb) 

205 errors = [] 

206 for task in self.tasks: 

207 try: 

208 task() 

209 except BaseException as err: 

210 errors.append(err) 

211 if errors: 

212 if len(errors) == 1: 

213 raise errors[0] from exc_value 

214 raise BaseExceptionGroup("Cleanup failed", errors) from exc_value 

215 

216 

217def cleanup(teardown): 

218 """Register a function to be called when the current test has finished 

219 executing. Any exceptions thrown in teardown will be printed but not 

220 rethrown. 

221 

222 Inside a test this isn't very interesting, because you can just use 

223 a finally block, but note that you can use this inside map, flatmap, 

224 etc. in order to e.g. insist that a value is closed at the end. 

225 """ 

226 context = _current_build_context.value 

227 if context is None: 

228 raise InvalidArgument("Cannot register cleanup outside of build context") 

229 context.tasks.append(teardown) 

230 

231 

232def should_note(): 

233 context = _current_build_context.value 

234 if context is None: 

235 raise InvalidArgument("Cannot make notes outside of a test") 

236 return context.is_final or settings.default.verbosity >= Verbosity.verbose 

237 

238 

239def note(value: object) -> None: 

240 """Report this value for the minimal failing example.""" 

241 if should_note(): 

242 if not isinstance(value, str): 

243 value = pretty(value) 

244 report(value) 

245 

246 

247def event(value: str, payload: str | int | float = "") -> None: 

248 """Record an event that occurred during this test. Statistics on the number of test 

249 runs with each event will be reported at the end if you run Hypothesis in 

250 statistics reporting mode. 

251 

252 Event values should be strings or convertible to them. If an optional 

253 payload is given, it will be included in the string for :ref:`statistics`. 

254 """ 

255 context = _current_build_context.value 

256 if context is None: 

257 raise InvalidArgument("Cannot record events outside of a test") 

258 

259 avoid_realization = context.data.provider.avoid_realization 

260 payload = _event_to_string( 

261 payload, allowed_types=(str, int, float), avoid_realization=avoid_realization 

262 ) 

263 value = _event_to_string(value, avoid_realization=avoid_realization) 

264 context.data.events[value] = payload 

265 

266 

267_events_to_strings: WeakKeyDictionary = WeakKeyDictionary() 

268 

269 

270def _event_to_string(event, *, allowed_types=str, avoid_realization): 

271 if isinstance(event, allowed_types): 

272 return event 

273 

274 # _events_to_strings is a cache which persists across iterations, causing 

275 # problems for symbolic backends. see 

276 # https://github.com/pschanely/hypothesis-crosshair/issues/41 

277 if avoid_realization: 

278 return str(event) 

279 

280 try: 

281 return _events_to_strings[event] 

282 except (KeyError, TypeError): 

283 pass 

284 

285 result = str(event) 

286 try: 

287 _events_to_strings[event] = result 

288 except TypeError: 

289 pass 

290 return result 

291 

292 

293def target(observation: int | float, *, label: str = "") -> int | float: 

294 """Calling this function with an ``int`` or ``float`` observation gives it feedback 

295 with which to guide our search for inputs that will cause an error, in 

296 addition to all the usual heuristics. Observations must always be finite. 

297 

298 Hypothesis will try to maximize the observed value over several examples; 

299 almost any metric will work so long as it makes sense to increase it. 

300 For example, ``-abs(error)`` is a metric that increases as ``error`` 

301 approaches zero. 

302 

303 Example metrics: 

304 

305 - Number of elements in a collection, or tasks in a queue 

306 - Mean or maximum runtime of a task (or both, if you use ``label``) 

307 - Compression ratio for data (perhaps per-algorithm or per-level) 

308 - Number of steps taken by a state machine 

309 

310 The optional ``label`` argument can be used to distinguish between 

311 and therefore separately optimise distinct observations, such as the 

312 mean and standard deviation of a dataset. It is an error to call 

313 ``target()`` with any label more than once per test case. 

314 

315 .. note:: 

316 The more examples you run, the better this technique works. 

317 

318 As a rule of thumb, the targeting effect is noticeable above 

319 :obj:`max_examples=1000 <hypothesis.settings.max_examples>`, 

320 and immediately obvious by around ten thousand examples 

321 *per label* used by your test. 

322 

323 :ref:`statistics` include the best score seen for each label, 

324 which can help avoid `the threshold problem 

325 <https://hypothesis.works/articles/threshold-problem/>`__ when the minimal 

326 example shrinks right down to the threshold of failure (:issue:`2180`). 

327 """ 

328 check_type((int, float), observation, "observation") 

329 if not math.isfinite(observation): 

330 raise InvalidArgument(f"{observation=} must be a finite float.") 

331 check_type(str, label, "label") 

332 

333 context = _current_build_context.value 

334 if context is None: 

335 raise InvalidArgument( 

336 "Calling target() outside of a test is invalid. " 

337 "Consider guarding this call with `if currently_in_test_context(): ...`" 

338 ) 

339 elif context.data.provider.avoid_realization: 

340 # We could in principle realize this in the engine, but it seems more 

341 # efficient to have our alternative backend optimize it for us. 

342 # See e.g. https://github.com/pschanely/hypothesis-crosshair/issues/3 

343 return observation # pragma: no cover 

344 verbose_report(f"Saw target({observation!r}, {label=})") 

345 

346 if label in context.data.target_observations: 

347 raise InvalidArgument( 

348 f"Calling target({observation!r}, {label=}) would overwrite " 

349 f"target({context.data.target_observations[label]!r}, {label=})" 

350 ) 

351 else: 

352 context.data.target_observations[label] = observation 

353 

354 return observation