Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/strategies/_internal/strategies.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

493 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import sys 

12import threading 

13import warnings 

14from collections import abc, defaultdict 

15from collections.abc import Sequence 

16from functools import lru_cache 

17from random import shuffle 

18from threading import RLock 

19from typing import ( 

20 TYPE_CHECKING, 

21 Any, 

22 Callable, 

23 ClassVar, 

24 Generic, 

25 Literal, 

26 Optional, 

27 TypeVar, 

28 Union, 

29 cast, 

30 overload, 

31) 

32 

33from hypothesis._settings import HealthCheck, Phase, Verbosity, settings 

34from hypothesis.control import _current_build_context, current_build_context 

35from hypothesis.errors import ( 

36 HypothesisException, 

37 HypothesisWarning, 

38 InvalidArgument, 

39 NonInteractiveExampleWarning, 

40 UnsatisfiedAssumption, 

41) 

42from hypothesis.internal.conjecture import utils as cu 

43from hypothesis.internal.conjecture.data import ConjectureData 

44from hypothesis.internal.conjecture.utils import ( 

45 calc_label_from_cls, 

46 calc_label_from_hash, 

47 calc_label_from_name, 

48 combine_labels, 

49) 

50from hypothesis.internal.coverage import check_function 

51from hypothesis.internal.reflection import ( 

52 get_pretty_function_description, 

53 is_identity_function, 

54) 

55from hypothesis.strategies._internal.utils import defines_strategy 

56from hypothesis.utils.conventions import UniqueIdentifier 

57 

58if TYPE_CHECKING: 

59 from typing import TypeAlias 

60 

61 Ex = TypeVar("Ex", covariant=True, default=Any) 

62else: 

63 Ex = TypeVar("Ex", covariant=True) 

64 

65T = TypeVar("T") 

66T3 = TypeVar("T3") 

67T4 = TypeVar("T4") 

68T5 = TypeVar("T5") 

69MappedFrom = TypeVar("MappedFrom") 

70MappedTo = TypeVar("MappedTo") 

71RecurT: "TypeAlias" = Callable[["SearchStrategy"], bool] 

72calculating = UniqueIdentifier("calculating") 

73 

74MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name( 

75 "another attempted draw in MappedStrategy" 

76) 

77 

78FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name( 

79 "single loop iteration in FilteredStrategy" 

80) 

81 

82label_lock = RLock() 

83 

84 

85def recursive_property(strategy: "SearchStrategy", name: str, default: object) -> Any: 

86 """Handle properties which may be mutually recursive among a set of 

87 strategies. 

88 

89 These are essentially lazily cached properties, with the ability to set 

90 an override: If the property has not been explicitly set, we calculate 

91 it on first access and memoize the result for later. 

92 

93 The problem is that for properties that depend on each other, a naive 

94 calculation strategy may hit infinite recursion. Consider for example 

95 the property is_empty. A strategy defined as x = st.deferred(lambda: x) 

96 is certainly empty (in order to draw a value from x we would have to 

97 draw a value from x, for which we would have to draw a value from x, 

98 ...), but in order to calculate it the naive approach would end up 

99 calling x.is_empty in order to calculate x.is_empty in order to etc. 

100 

101 The solution is one of fixed point calculation. We start with a default 

102 value that is the value of the property in the absence of evidence to 

103 the contrary, and then update the values of the property for all 

104 dependent strategies until we reach a fixed point. 

105 

106 The approach taken roughly follows that in section 4.2 of Adams, 

107 Michael D., Celeste Hollenbeck, and Matthew Might. "On the complexity 

108 and performance of parsing with derivatives." ACM SIGPLAN Notices 51.6 

109 (2016): 224-236. 

110 """ 

111 assert name in {"is_empty", "has_reusable_values", "is_cacheable"} 

112 cache_key = "cached_" + name 

113 calculation = "calc_" + name 

114 force_key = "force_" + name 

115 

116 def forced_value(target: SearchStrategy) -> Any: 

117 try: 

118 return getattr(target, force_key) 

119 except AttributeError: 

120 return getattr(target, cache_key) 

121 

122 try: 

123 return forced_value(strategy) 

124 except AttributeError: 

125 pass 

126 

127 mapping: dict[SearchStrategy, Any] = {} 

128 sentinel = object() 

129 hit_recursion = False 

130 

131 # For a first pass we do a direct recursive calculation of the 

132 # property, but we block recursively visiting a value in the 

133 # computation of its property: When that happens, we simply 

134 # note that it happened and return the default value. 

135 def recur(strat: SearchStrategy) -> Any: 

136 nonlocal hit_recursion 

137 try: 

138 return forced_value(strat) 

139 except AttributeError: 

140 pass 

141 result = mapping.get(strat, sentinel) 

142 if result is calculating: 

143 hit_recursion = True 

144 return default 

145 elif result is sentinel: 

146 mapping[strat] = calculating 

147 mapping[strat] = getattr(strat, calculation)(recur) 

148 return mapping[strat] 

149 return result 

150 

151 recur(strategy) 

152 

153 # If we hit self-recursion in the computation of any strategy 

154 # value, our mapping at the end is imprecise - it may or may 

155 # not have the right values in it. We now need to proceed with 

156 # a more careful fixed point calculation to get the exact 

157 # values. Hopefully our mapping is still pretty good and it 

158 # won't take a large number of updates to reach a fixed point. 

159 if hit_recursion: 

160 needs_update = set(mapping) 

161 

162 # We track which strategies use which in the course of 

163 # calculating their property value. If A ever uses B in 

164 # the course of calculating its value, then whenever the 

165 # value of B changes we might need to update the value of 

166 # A. 

167 listeners: dict[SearchStrategy, set[SearchStrategy]] = defaultdict(set) 

168 else: 

169 needs_update = None 

170 

171 def recur2(strat: SearchStrategy) -> Any: 

172 def recur_inner(other: SearchStrategy) -> Any: 

173 try: 

174 return forced_value(other) 

175 except AttributeError: 

176 pass 

177 listeners[other].add(strat) 

178 result = mapping.get(other, sentinel) 

179 if result is sentinel: 

180 assert needs_update is not None 

181 needs_update.add(other) 

182 mapping[other] = default 

183 return default 

184 return result 

185 

186 return recur_inner 

187 

188 count = 0 

189 seen = set() 

190 while needs_update: 

191 count += 1 

192 # If we seem to be taking a really long time to stabilize we 

193 # start tracking seen values to attempt to detect an infinite 

194 # loop. This should be impossible, and most code will never 

195 # hit the count, but having an assertion for it means that 

196 # testing is easier to debug and we don't just have a hung 

197 # test. 

198 # Note: This is actually covered, by test_very_deep_deferral 

199 # in tests/cover/test_deferred_strategies.py. Unfortunately it 

200 # runs into a coverage bug. See 

201 # https://github.com/nedbat/coveragepy/issues/605 

202 # for details. 

203 if count > 50: # pragma: no cover 

204 key = frozenset(mapping.items()) 

205 assert key not in seen, (key, name) 

206 seen.add(key) 

207 to_update = needs_update 

208 needs_update = set() 

209 for strat in to_update: 

210 new_value = getattr(strat, calculation)(recur2(strat)) 

211 if new_value != mapping[strat]: 

212 needs_update.update(listeners[strat]) 

213 mapping[strat] = new_value 

214 

215 # We now have a complete and accurate calculation of the 

216 # property values for everything we have seen in the course of 

217 # running this calculation. We simultaneously update all of 

218 # them (not just the strategy we started out with). 

219 for k, v in mapping.items(): 

220 setattr(k, cache_key, v) 

221 return getattr(strategy, cache_key) 

222 

223 

224class SearchStrategy(Generic[Ex]): 

225 """A ``SearchStrategy`` tells Hypothesis how to generate that kind of input. 

226 

227 This class is only part of the public API for use in type annotations, so that 

228 you can write e.g. ``-> SearchStrategy[Foo]`` for your function which returns 

229 ``builds(Foo, ...)``. Do not inherit from or directly instantiate this class. 

230 """ 

231 

232 __module__: str = "hypothesis.strategies" 

233 LABELS: ClassVar[dict[type, int]] = {} 

234 # triggers `assert isinstance(label, int)` under threading when setting this 

235 # in init instead of a classvar. I'm not sure why, init should be safe. But 

236 # this works so I'm not looking into it further atm. 

237 __label: Union[int, UniqueIdentifier, None] = None 

238 

239 def __init__(self): 

240 self.validate_called: dict[int, bool] = {} 

241 

242 def _available(self, data: ConjectureData) -> bool: 

243 """Returns whether this strategy can *currently* draw any 

244 values. This typically useful for stateful testing where ``Bundle`` 

245 grows over time a list of value to choose from. 

246 

247 Unlike ``empty`` property, this method's return value may change 

248 over time. 

249 Note: ``data`` parameter will only be used for introspection and no 

250 value drawn from it. 

251 """ 

252 return not self.is_empty 

253 

254 @property 

255 def is_empty(self) -> Any: 

256 # Returns True if this strategy can never draw a value and will always 

257 # result in the data being marked invalid. 

258 # The fact that this returns False does not guarantee that a valid value 

259 # can be drawn - this is not intended to be perfect, and is primarily 

260 # intended to be an optimisation for some cases. 

261 return recursive_property(self, "is_empty", True) 

262 

263 @property 

264 def supports_find(self) -> bool: 

265 return True 

266 

267 # Returns True if values from this strategy can safely be reused without 

268 # this causing unexpected behaviour. 

269 

270 # True if values from this strategy can be implicitly reused (e.g. as 

271 # background values in a numpy array) without causing surprising 

272 # user-visible behaviour. Should be false for built-in strategies that 

273 # produce mutable values, and for strategies that have been mapped/filtered 

274 # by arbitrary user-provided functions. 

275 @property 

276 def has_reusable_values(self) -> Any: 

277 return recursive_property(self, "has_reusable_values", True) 

278 

279 # Whether this strategy is suitable for holding onto in a cache. 

280 @property 

281 def is_cacheable(self) -> Any: 

282 return recursive_property(self, "is_cacheable", True) 

283 

284 def calc_is_cacheable(self, recur: RecurT) -> bool: 

285 return True 

286 

287 def calc_is_empty(self, recur: RecurT) -> bool: 

288 # Note: It is correct and significant that the default return value 

289 # from calc_is_empty is False despite the default value for is_empty 

290 # being true. The reason for this is that strategies should be treated 

291 # as empty absent evidence to the contrary, but most basic strategies 

292 # are trivially non-empty and it would be annoying to have to override 

293 # this method to show that. 

294 return False 

295 

296 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

297 return False 

298 

299 def example(self) -> Ex: # FIXME 

300 """Provide an example of the sort of value that this strategy generates. 

301 

302 This method is designed for use in a REPL, and will raise an error if 

303 called from inside |@given| or a strategy definition. For serious use, 

304 see |@composite| or |st.data|. 

305 """ 

306 if getattr(sys, "ps1", None) is None: # pragma: no branch 

307 # The other branch *is* covered in cover/test_examples.py; but as that 

308 # uses `pexpect` for an interactive session `coverage` doesn't see it. 

309 warnings.warn( 

310 "The `.example()` method is good for exploring strategies, but should " 

311 "only be used interactively. We recommend using `@given` for tests - " 

312 "it performs better, saves and replays failures to avoid flakiness, " 

313 f"and reports minimal examples. (strategy: {self!r})", 

314 NonInteractiveExampleWarning, 

315 stacklevel=2, 

316 ) 

317 

318 context = _current_build_context.value 

319 if context is not None: 

320 if context.data is not None and context.data.depth > 0: 

321 raise HypothesisException( 

322 "Using example() inside a strategy definition is a bad " 

323 "idea. Instead consider using hypothesis.strategies.builds() " 

324 "or @hypothesis.strategies.composite to define your strategy." 

325 " See https://hypothesis.readthedocs.io/en/latest/data.html" 

326 "#hypothesis.strategies.builds or " 

327 "https://hypothesis.readthedocs.io/en/latest/data.html" 

328 "#composite-strategies for more details." 

329 ) 

330 else: 

331 raise HypothesisException( 

332 "Using example() inside a test function is a bad " 

333 "idea. Instead consider using hypothesis.strategies.data() " 

334 "to draw more examples during testing. See " 

335 "https://hypothesis.readthedocs.io/en/latest/data.html" 

336 "#drawing-interactively-in-tests for more details." 

337 ) 

338 

339 try: 

340 return self.__examples.pop() 

341 except (AttributeError, IndexError): 

342 self.__examples: list[Ex] = [] 

343 

344 from hypothesis.core import given 

345 

346 # Note: this function has a weird name because it might appear in 

347 # tracebacks, and we want users to know that they can ignore it. 

348 @given(self) 

349 @settings( 

350 database=None, 

351 # generate only a few examples at a time to avoid slow interactivity 

352 # for large strategies. The overhead of @given is very small relative 

353 # to generation, so a small batch size is fine. 

354 max_examples=10, 

355 deadline=None, 

356 verbosity=Verbosity.quiet, 

357 phases=(Phase.generate,), 

358 suppress_health_check=list(HealthCheck), 

359 ) 

360 def example_generating_inner_function( 

361 ex: Ex, # type: ignore # mypy is overzealous in preventing covariant params 

362 ) -> None: 

363 self.__examples.append(ex) 

364 

365 example_generating_inner_function() 

366 shuffle(self.__examples) 

367 return self.__examples.pop() 

368 

369 def map(self, pack: Callable[[Ex], T]) -> "SearchStrategy[T]": 

370 """Returns a new strategy which generates a value from this one, and 

371 then returns ``pack(value)``. For example, ``integers().map(str)`` 

372 could generate ``str(5)`` == ``"5"``. 

373 """ 

374 if is_identity_function(pack): 

375 return self # type: ignore # Mypy has no way to know that `Ex == T` 

376 return MappedStrategy(self, pack=pack) 

377 

378 def flatmap( 

379 self, expand: Callable[[Ex], "SearchStrategy[T]"] 

380 ) -> "SearchStrategy[T]": # FIXME 

381 """Old syntax for a special case of |@composite|: 

382 

383 .. code-block:: python 

384 

385 @st.composite 

386 def flatmap_like(draw, base_strategy, expand): 

387 value = draw(base_strategy) 

388 new_strategy = expand(value) 

389 return draw(new_strategy) 

390 

391 We find that the greater readability of |@composite| usually outweighs 

392 the verbosity, with a few exceptions for simple cases or recipes like 

393 ``from_type(type).flatmap(from_type)`` ("pick a type, get a strategy for 

394 any instance of that type, and then generate one of those"). 

395 """ 

396 from hypothesis.strategies._internal.flatmapped import FlatMapStrategy 

397 

398 return FlatMapStrategy(self, expand=expand) 

399 

400 # Note that we previously had condition extracted to a type alias as 

401 # PredicateT. However, that was only useful when not specifying a relationship 

402 # between the generic Ts and some other function param / return value. 

403 # If we do want to - like here, where we want to say that the Ex arg to condition 

404 # is of the same type as the strategy's Ex - then you need to write out the 

405 # entire Callable[[Ex], Any] expression rather than use a type alias. 

406 # TypeAlias is *not* simply a macro that inserts the text. TypeAlias will not 

407 # reference the local TypeVar context. 

408 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]": 

409 """Returns a new strategy that generates values from this strategy 

410 which satisfy the provided condition. 

411 

412 Note that if the condition is too hard to satisfy this might result 

413 in your tests failing with an Unsatisfiable exception. 

414 A basic version of the filtering logic would look something like: 

415 

416 .. code-block:: python 

417 

418 @st.composite 

419 def filter_like(draw, strategy, condition): 

420 for _ in range(3): 

421 value = draw(strategy) 

422 if condition(value): 

423 return value 

424 assume(False) 

425 """ 

426 return FilteredStrategy(conditions=(condition,), strategy=self) 

427 

428 def _filter_for_filtered_draw( 

429 self, condition: Callable[[Ex], Any] 

430 ) -> "FilteredStrategy[Ex]": 

431 # Hook for parent strategies that want to perform fallible filtering 

432 # on one of their internal strategies (e.g. UniqueListStrategy). 

433 # The returned object must have a `.do_filtered_draw(data)` method 

434 # that behaves like `do_draw`, but returns the sentinel object 

435 # `filter_not_satisfied` if the condition could not be satisfied. 

436 

437 # This is separate from the main `filter` method so that strategies 

438 # can override `filter` without having to also guarantee a 

439 # `do_filtered_draw` method. 

440 return FilteredStrategy(conditions=(condition,), strategy=self) 

441 

442 @property 

443 def branches(self) -> Sequence["SearchStrategy[Ex]"]: 

444 return [self] 

445 

446 def __or__(self, other: "SearchStrategy[T]") -> "SearchStrategy[Union[Ex, T]]": 

447 """Return a strategy which produces values by randomly drawing from one 

448 of this strategy or the other strategy. 

449 

450 This method is part of the public API. 

451 """ 

452 if not isinstance(other, SearchStrategy): 

453 raise ValueError(f"Cannot | a SearchStrategy with {other!r}") 

454 

455 # Unwrap explicitly or'd strategies. This turns the 

456 # common case of e.g. st.integers() | st.integers() | st.integers() from 

457 # 

458 # one_of(one_of(integers(), integers()), integers()) 

459 # 

460 # into 

461 # 

462 # one_of(integers(), integers(), integers()) 

463 # 

464 # This is purely an aesthetic unwrapping, for e.g. reprs. In practice 

465 # we use .branches / .element_strategies to get the list of possible 

466 # strategies, so this unwrapping is *not* necessary for correctness. 

467 strategies: list[SearchStrategy] = [] 

468 strategies.extend( 

469 self.original_strategies if isinstance(self, OneOfStrategy) else [self] 

470 ) 

471 strategies.extend( 

472 other.original_strategies if isinstance(other, OneOfStrategy) else [other] 

473 ) 

474 return OneOfStrategy(strategies) 

475 

476 def __bool__(self) -> bool: 

477 warnings.warn( 

478 f"bool({self!r}) is always True, did you mean to draw a value?", 

479 HypothesisWarning, 

480 stacklevel=2, 

481 ) 

482 return True 

483 

484 def validate(self) -> None: 

485 """Throw an exception if the strategy is not valid. 

486 

487 Strategies should implement ``do_validate``, which is called by this 

488 method. They should not override ``validate``. 

489 

490 This can happen due to invalid arguments, or lazy construction. 

491 """ 

492 thread_id = threading.get_ident() 

493 if self.validate_called.get(thread_id, False): 

494 return 

495 # we need to set validate_called before calling do_validate, for 

496 # recursive / deferred strategies. But if a thread switches after 

497 # validate_called but before do_validate, we might have a strategy 

498 # which does weird things like drawing when do_validate would error but 

499 # its params are technically valid (e.g. a param was passed as 1.0 

500 # instead of 1) and get into weird internal states. 

501 # 

502 # There are two ways to fix this. 

503 # (1) The first is a per-strategy lock around do_validate. Even though we 

504 # expect near-zero lock contention, this still adds the lock overhead. 

505 # (2) The second is allowing concurrent .validate calls. Since validation 

506 # is (assumed to be) deterministic, both threads will produce the same 

507 # end state, so the validation order or race conditions does not matter. 

508 # 

509 # In order to avoid the lock overhead of (1), we use (2) here. See also 

510 # discussion in https://github.com/HypothesisWorks/hypothesis/pull/4473. 

511 try: 

512 self.validate_called[thread_id] = True 

513 self.do_validate() 

514 self.is_empty 

515 self.has_reusable_values 

516 except Exception: 

517 self.validate_called[thread_id] = False 

518 raise 

519 

520 @property 

521 def class_label(self) -> int: 

522 cls = self.__class__ 

523 try: 

524 return cls.LABELS[cls] 

525 except KeyError: 

526 pass 

527 result = calc_label_from_cls(cls) 

528 cls.LABELS[cls] = result 

529 return result 

530 

531 @property 

532 def label(self) -> int: 

533 if isinstance((label := self.__label), int): 

534 # avoid locking if we've already completely computed the label. 

535 return label 

536 

537 with label_lock: 

538 if self.__label is calculating: 

539 return 0 

540 self.__label = calculating 

541 self.__label = self.calc_label() 

542 return self.__label 

543 

544 def calc_label(self) -> int: 

545 return self.class_label 

546 

547 def do_validate(self) -> None: 

548 pass 

549 

550 def do_draw(self, data: ConjectureData) -> Ex: 

551 raise NotImplementedError(f"{type(self).__name__}.do_draw") 

552 

553 

554def _is_hashable(value: object) -> tuple[bool, Optional[int]]: 

555 # hashing can be expensive; return the hash value if we compute it, so that 

556 # callers don't have to recompute. 

557 try: 

558 return (True, hash(value)) 

559 except TypeError: 

560 return (False, None) 

561 

562 

563def is_hashable(value: object) -> bool: 

564 return _is_hashable(value)[0] 

565 

566 

567class SampledFromStrategy(SearchStrategy[Ex]): 

568 """A strategy which samples from a set of elements. This is essentially 

569 equivalent to using a OneOfStrategy over Just strategies but may be more 

570 efficient and convenient. 

571 """ 

572 

573 _MAX_FILTER_CALLS: ClassVar[int] = 10_000 

574 

575 def __init__( 

576 self, 

577 elements: Sequence[Ex], 

578 *, 

579 force_repr: Optional[str] = None, 

580 force_repr_braces: Optional[tuple[str, str]] = None, 

581 transformations: tuple[ 

582 tuple[Literal["filter", "map"], Callable[[Ex], Any]], 

583 ..., 

584 ] = (), 

585 ): 

586 super().__init__() 

587 self.elements = cu.check_sample(elements, "sampled_from") 

588 assert self.elements 

589 self.force_repr = force_repr 

590 self.force_repr_braces = force_repr_braces 

591 self._transformations = transformations 

592 

593 self._cached_repr: Optional[str] = None 

594 

595 def map(self, pack: Callable[[Ex], T]) -> SearchStrategy[T]: 

596 s = type(self)( 

597 self.elements, 

598 force_repr=self.force_repr, 

599 force_repr_braces=self.force_repr_braces, 

600 transformations=(*self._transformations, ("map", pack)), 

601 ) 

602 # guaranteed by the ("map", pack) transformation 

603 return cast(SearchStrategy[T], s) 

604 

605 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]: 

606 return type(self)( 

607 self.elements, 

608 force_repr=self.force_repr, 

609 force_repr_braces=self.force_repr_braces, 

610 transformations=(*self._transformations, ("filter", condition)), 

611 ) 

612 

613 def __repr__(self): 

614 if self._cached_repr is None: 

615 rep = get_pretty_function_description 

616 elements_s = ( 

617 ", ".join(rep(v) for v in self.elements[:512]) + ", ..." 

618 if len(self.elements) > 512 

619 else ", ".join(rep(v) for v in self.elements) 

620 ) 

621 braces = self.force_repr_braces or ("(", ")") 

622 instance_s = ( 

623 self.force_repr or f"sampled_from({braces[0]}{elements_s}{braces[1]})" 

624 ) 

625 transforms_s = "".join( 

626 f".{name}({get_pretty_function_description(f)})" 

627 for name, f in self._transformations 

628 ) 

629 repr_s = instance_s + transforms_s 

630 self._cached_repr = repr_s 

631 return self._cached_repr 

632 

633 def calc_label(self) -> int: 

634 # strategy.label is effectively an under-approximation of structural 

635 # equality (i.e., some strategies may have the same label when they are not 

636 # structurally identical). More importantly for calculating the 

637 # SampledFromStrategy label, we might have hash(s1) != hash(s2) even 

638 # when s1 and s2 are structurally identical. For instance: 

639 # 

640 # s1 = st.sampled_from([st.none()]) 

641 # s2 = st.sampled_from([st.none()]) 

642 # assert hash(s1) != hash(s2) 

643 # 

644 # (see also test cases in test_labels.py). 

645 # 

646 # We therefore use the labels of any component strategies when calculating 

647 # our label, and only use the hash if it is not a strategy. 

648 # 

649 # That's the ideal, anyway. In reality the logic is more complicated than 

650 # necessary in order to be efficient in the presence of (very) large sequences: 

651 # * add an unabashed special case for range, to avoid iteration over an 

652 # enormous range when we know it is entirely integers. 

653 # * if there is at least one strategy in self.elements, use strategy label, 

654 # and the element hash otherwise. 

655 # * if there are no strategies in self.elements, take the hash of the 

656 # entire sequence. This prevents worst-case performance of hashing each 

657 # element when a hash of the entire sequence would have sufficed. 

658 # 

659 # The worst case performance of this scheme is 

660 # itertools.chain(range(2**100), [st.none()]), where it degrades to 

661 # hashing every int in the range. 

662 (elements_is_hashable, hash_value) = _is_hashable(self.elements) 

663 if isinstance(self.elements, range) or ( 

664 elements_is_hashable 

665 and not any(isinstance(e, SearchStrategy) for e in self.elements) 

666 ): 

667 return combine_labels( 

668 self.class_label, calc_label_from_name(str(hash_value)) 

669 ) 

670 

671 labels = [self.class_label] 

672 for element in self.elements: 

673 if not is_hashable(element): 

674 continue 

675 

676 labels.append( 

677 element.label 

678 if isinstance(element, SearchStrategy) 

679 else calc_label_from_hash(element) 

680 ) 

681 

682 return combine_labels(*labels) 

683 

684 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

685 # Because our custom .map/.filter implementations skip the normal 

686 # wrapper strategies (which would automatically return False for us), 

687 # we need to manually return False here if any transformations have 

688 # been applied. 

689 return not self._transformations 

690 

691 def calc_is_cacheable(self, recur: RecurT) -> bool: 

692 return is_hashable(self.elements) 

693 

694 def _transform( 

695 self, 

696 # https://github.com/python/mypy/issues/7049, we're not writing `element` 

697 # anywhere in the class so this is still type-safe. mypy is being more 

698 # conservative than necessary 

699 element: Ex, # type: ignore 

700 ) -> Union[Ex, UniqueIdentifier]: 

701 # Used in UniqueSampledListStrategy 

702 for name, f in self._transformations: 

703 if name == "map": 

704 result = f(element) 

705 if build_context := _current_build_context.value: 

706 build_context.record_call(result, f, [element], {}) 

707 element = result 

708 else: 

709 assert name == "filter" 

710 if not f(element): 

711 return filter_not_satisfied 

712 return element 

713 

714 def do_draw(self, data: ConjectureData) -> Ex: 

715 result = self.do_filtered_draw(data) 

716 if isinstance(result, SearchStrategy) and all( 

717 isinstance(x, SearchStrategy) for x in self.elements 

718 ): 

719 data._sampled_from_all_strategies_elements_message = ( 

720 "sample_from was given a collection of strategies: " 

721 "{!r}. Was one_of intended?", 

722 self.elements, 

723 ) 

724 if result is filter_not_satisfied: 

725 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}") 

726 assert not isinstance(result, UniqueIdentifier) 

727 return result 

728 

729 def get_element(self, i: int) -> Union[Ex, UniqueIdentifier]: 

730 return self._transform(self.elements[i]) 

731 

732 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]: 

733 # Set of indices that have been tried so far, so that we never test 

734 # the same element twice during a draw. 

735 known_bad_indices: set[int] = set() 

736 

737 # Start with ordinary rejection sampling. It's fast if it works, and 

738 # if it doesn't work then it was only a small amount of overhead. 

739 for _ in range(3): 

740 i = data.draw_integer(0, len(self.elements) - 1) 

741 if i not in known_bad_indices: 

742 element = self.get_element(i) 

743 if element is not filter_not_satisfied: 

744 return element 

745 if not known_bad_indices: 

746 data.events[f"Retried draw from {self!r} to satisfy filter"] = "" 

747 known_bad_indices.add(i) 

748 

749 # If we've tried all the possible elements, give up now. 

750 max_good_indices = len(self.elements) - len(known_bad_indices) 

751 if not max_good_indices: 

752 return filter_not_satisfied 

753 

754 # Impose an arbitrary cutoff to prevent us from wasting too much time 

755 # on very large element lists. 

756 max_good_indices = min(max_good_indices, self._MAX_FILTER_CALLS - 3) 

757 

758 # Before building the list of allowed indices, speculatively choose 

759 # one of them. We don't yet know how many allowed indices there will be, 

760 # so this choice might be out-of-bounds, but that's OK. 

761 speculative_index = data.draw_integer(0, max_good_indices - 1) 

762 

763 # Calculate the indices of allowed values, so that we can choose one 

764 # of them at random. But if we encounter the speculatively-chosen one, 

765 # just use that and return immediately. Note that we also track the 

766 # allowed elements, in case of .map(some_stateful_function) 

767 allowed: list[tuple[int, Ex]] = [] 

768 for i in range(min(len(self.elements), self._MAX_FILTER_CALLS - 3)): 

769 if i not in known_bad_indices: 

770 element = self.get_element(i) 

771 if element is not filter_not_satisfied: 

772 assert not isinstance(element, UniqueIdentifier) 

773 allowed.append((i, element)) 

774 if len(allowed) > speculative_index: 

775 # Early-exit case: We reached the speculative index, so 

776 # we just return the corresponding element. 

777 data.draw_integer(0, len(self.elements) - 1, forced=i) 

778 return element 

779 

780 # The speculative index didn't work out, but at this point we've built 

781 # and can choose from the complete list of allowed indices and elements. 

782 if allowed: 

783 i, element = data.choice(allowed) 

784 data.draw_integer(0, len(self.elements) - 1, forced=i) 

785 return element 

786 # If there are no allowed indices, the filter couldn't be satisfied. 

787 return filter_not_satisfied 

788 

789 

790class OneOfStrategy(SearchStrategy[Ex]): 

791 """Implements a union of strategies. Given a number of strategies this 

792 generates values which could have come from any of them. 

793 

794 The conditional distribution draws uniformly at random from some 

795 non-empty subset of these strategies and then draws from the 

796 conditional distribution of that strategy. 

797 """ 

798 

799 def __init__(self, strategies: Sequence[SearchStrategy[Ex]]): 

800 super().__init__() 

801 self.original_strategies = tuple(strategies) 

802 self.__element_strategies: Optional[Sequence[SearchStrategy[Ex]]] = None 

803 self.__in_branches = False 

804 

805 def calc_is_empty(self, recur: RecurT) -> bool: 

806 return all(recur(e) for e in self.original_strategies) 

807 

808 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

809 return all(recur(e) for e in self.original_strategies) 

810 

811 def calc_is_cacheable(self, recur: RecurT) -> bool: 

812 return all(recur(e) for e in self.original_strategies) 

813 

814 @property 

815 def element_strategies(self) -> Sequence[SearchStrategy[Ex]]: 

816 if self.__element_strategies is None: 

817 # While strategies are hashable, they use object.__hash__ and are 

818 # therefore distinguished only by identity. 

819 # 

820 # In principle we could "just" define a __hash__ method 

821 # (and __eq__, but that's easy in terms of type() and hash()) 

822 # to make this more powerful, but this is harder than it sounds: 

823 # 

824 # 1. Strategies are often distinguished by non-hashable attributes, 

825 # or by attributes that have the same hash value ("^.+" / b"^.+"). 

826 # 2. LazyStrategy: can't reify the wrapped strategy without breaking 

827 # laziness, so there's a hash each for the lazy and the nonlazy. 

828 # 

829 # Having made several attempts, the minor benefits of making strategies 

830 # hashable are simply not worth the engineering effort it would take. 

831 # See also issues #2291 and #2327. 

832 seen: set[SearchStrategy] = {self} 

833 strategies: list[SearchStrategy] = [] 

834 for arg in self.original_strategies: 

835 check_strategy(arg) 

836 if not arg.is_empty: 

837 for s in arg.branches: 

838 if s not in seen and not s.is_empty: 

839 seen.add(s) 

840 strategies.append(s) 

841 self.__element_strategies = strategies 

842 return self.__element_strategies 

843 

844 def calc_label(self) -> int: 

845 return combine_labels( 

846 self.class_label, *(p.label for p in self.original_strategies) 

847 ) 

848 

849 def do_draw(self, data: ConjectureData) -> Ex: 

850 strategy = data.draw( 

851 SampledFromStrategy(self.element_strategies).filter( 

852 lambda s: s._available(data) 

853 ) 

854 ) 

855 return data.draw(strategy) 

856 

857 def __repr__(self) -> str: 

858 return "one_of({})".format(", ".join(map(repr, self.original_strategies))) 

859 

860 def do_validate(self) -> None: 

861 for e in self.element_strategies: 

862 e.validate() 

863 

864 @property 

865 def branches(self) -> Sequence[SearchStrategy[Ex]]: 

866 if not self.__in_branches: 

867 try: 

868 self.__in_branches = True 

869 return self.element_strategies 

870 finally: 

871 self.__in_branches = False 

872 else: 

873 return [self] 

874 

875 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]: 

876 return FilteredStrategy( 

877 OneOfStrategy([s.filter(condition) for s in self.original_strategies]), 

878 conditions=(), 

879 ) 

880 

881 

882@overload 

883def one_of( 

884 __args: Sequence[SearchStrategy[Ex]], 

885) -> SearchStrategy[Ex]: # pragma: no cover 

886 ... 

887 

888 

889@overload 

890def one_of(__a1: SearchStrategy[Ex]) -> SearchStrategy[Ex]: # pragma: no cover 

891 ... 

892 

893 

894@overload 

895def one_of( 

896 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T] 

897) -> SearchStrategy[Union[Ex, T]]: # pragma: no cover 

898 ... 

899 

900 

901@overload 

902def one_of( 

903 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T], __a3: SearchStrategy[T3] 

904) -> SearchStrategy[Union[Ex, T, T3]]: # pragma: no cover 

905 ... 

906 

907 

908@overload 

909def one_of( 

910 __a1: SearchStrategy[Ex], 

911 __a2: SearchStrategy[T], 

912 __a3: SearchStrategy[T3], 

913 __a4: SearchStrategy[T4], 

914) -> SearchStrategy[Union[Ex, T, T3, T4]]: # pragma: no cover 

915 ... 

916 

917 

918@overload 

919def one_of( 

920 __a1: SearchStrategy[Ex], 

921 __a2: SearchStrategy[T], 

922 __a3: SearchStrategy[T3], 

923 __a4: SearchStrategy[T4], 

924 __a5: SearchStrategy[T5], 

925) -> SearchStrategy[Union[Ex, T, T3, T4, T5]]: # pragma: no cover 

926 ... 

927 

928 

929@overload 

930def one_of(*args: SearchStrategy[Any]) -> SearchStrategy[Any]: # pragma: no cover 

931 ... 

932 

933 

934@defines_strategy(never_lazy=True) 

935def one_of( 

936 *args: Union[Sequence[SearchStrategy[Any]], SearchStrategy[Any]] 

937) -> SearchStrategy[Any]: 

938 # Mypy workaround alert: Any is too loose above; the return parameter 

939 # should be the union of the input parameters. Unfortunately, Mypy <=0.600 

940 # raises errors due to incompatible inputs instead. See #1270 for links. 

941 # v0.610 doesn't error; it gets inference wrong for 2+ arguments instead. 

942 """Return a strategy which generates values from any of the argument 

943 strategies. 

944 

945 This may be called with one iterable argument instead of multiple 

946 strategy arguments, in which case ``one_of(x)`` and ``one_of(*x)`` are 

947 equivalent. 

948 

949 Examples from this strategy will generally shrink to ones that come from 

950 strategies earlier in the list, then shrink according to behaviour of the 

951 strategy that produced them. In order to get good shrinking behaviour, 

952 try to put simpler strategies first. e.g. ``one_of(none(), text())`` is 

953 better than ``one_of(text(), none())``. 

954 

955 This is especially important when using recursive strategies. e.g. 

956 ``x = st.deferred(lambda: st.none() | st.tuples(x, x))`` will shrink well, 

957 but ``x = st.deferred(lambda: st.tuples(x, x) | st.none())`` will shrink 

958 very badly indeed. 

959 """ 

960 if len(args) == 1 and not isinstance(args[0], SearchStrategy): 

961 try: 

962 args = tuple(args[0]) 

963 except TypeError: 

964 pass 

965 if len(args) == 1 and isinstance(args[0], SearchStrategy): 

966 # This special-case means that we can one_of over lists of any size 

967 # without incurring any performance overhead when there is only one 

968 # strategy, and keeps our reprs simple. 

969 return args[0] 

970 if args and not any(isinstance(a, SearchStrategy) for a in args): 

971 # And this special case is to give a more-specific error message if it 

972 # seems that the user has confused `one_of()` for `sampled_from()`; 

973 # the remaining validation is left to OneOfStrategy. See PR #2627. 

974 raise InvalidArgument( 

975 f"Did you mean st.sampled_from({list(args)!r})? st.one_of() is used " 

976 "to combine strategies, but all of the arguments were of other types." 

977 ) 

978 # we've handled the case where args is a one-element sequence [(s1, s2, ...)] 

979 # above, so we can assume it's an actual sequence of strategies. 

980 args = cast(Sequence[SearchStrategy], args) 

981 return OneOfStrategy(args) 

982 

983 

984class MappedStrategy(SearchStrategy[MappedTo], Generic[MappedFrom, MappedTo]): 

985 """A strategy which is defined purely by conversion to and from another 

986 strategy. 

987 

988 Its parameter and distribution come from that other strategy. 

989 """ 

990 

991 def __init__( 

992 self, 

993 strategy: SearchStrategy[MappedFrom], 

994 pack: Callable[[MappedFrom], MappedTo], 

995 ) -> None: 

996 super().__init__() 

997 self.mapped_strategy = strategy 

998 self.pack = pack 

999 

1000 def calc_is_empty(self, recur: RecurT) -> bool: 

1001 return recur(self.mapped_strategy) 

1002 

1003 def calc_is_cacheable(self, recur: RecurT) -> bool: 

1004 return recur(self.mapped_strategy) 

1005 

1006 def __repr__(self) -> str: 

1007 if not hasattr(self, "_cached_repr"): 

1008 self._cached_repr = f"{self.mapped_strategy!r}.map({get_pretty_function_description(self.pack)})" 

1009 return self._cached_repr 

1010 

1011 def do_validate(self) -> None: 

1012 self.mapped_strategy.validate() 

1013 

1014 def do_draw(self, data: ConjectureData) -> MappedTo: 

1015 with warnings.catch_warnings(): 

1016 if isinstance(self.pack, type) and issubclass( 

1017 self.pack, (abc.Mapping, abc.Set) 

1018 ): 

1019 warnings.simplefilter("ignore", BytesWarning) 

1020 for _ in range(3): 

1021 try: 

1022 data.start_span(MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL) 

1023 x = data.draw(self.mapped_strategy) 

1024 result = self.pack(x) 

1025 data.stop_span() 

1026 current_build_context().record_call(result, self.pack, [x], {}) 

1027 return result 

1028 except UnsatisfiedAssumption: 

1029 data.stop_span(discard=True) 

1030 raise UnsatisfiedAssumption 

1031 

1032 @property 

1033 def branches(self) -> Sequence[SearchStrategy[MappedTo]]: 

1034 return [ 

1035 MappedStrategy(strategy, pack=self.pack) 

1036 for strategy in self.mapped_strategy.branches 

1037 ] 

1038 

1039 def filter( 

1040 self, condition: Callable[[MappedTo], Any] 

1041 ) -> "SearchStrategy[MappedTo]": 

1042 # Includes a special case so that we can rewrite filters on collection 

1043 # lengths, when most collections are `st.lists(...).map(the_type)`. 

1044 ListStrategy = _list_strategy_type() 

1045 if not isinstance(self.mapped_strategy, ListStrategy) or not ( 

1046 (isinstance(self.pack, type) and issubclass(self.pack, abc.Collection)) 

1047 or self.pack in _collection_ish_functions() 

1048 ): 

1049 return super().filter(condition) 

1050 

1051 # Check whether our inner list strategy can rewrite this filter condition. 

1052 # If not, discard the result and _only_ apply a new outer filter. 

1053 new = ListStrategy.filter(self.mapped_strategy, condition) 

1054 if getattr(new, "filtered_strategy", None) is self.mapped_strategy: 

1055 return super().filter(condition) # didn't rewrite 

1056 

1057 # Apply a new outer filter even though we rewrote the inner strategy, 

1058 # because some collections can change the list length (dict, set, etc). 

1059 return FilteredStrategy(type(self)(new, self.pack), conditions=(condition,)) 

1060 

1061 

1062@lru_cache 

1063def _list_strategy_type() -> Any: 

1064 from hypothesis.strategies._internal.collections import ListStrategy 

1065 

1066 return ListStrategy 

1067 

1068 

1069def _collection_ish_functions() -> Sequence[Any]: 

1070 funcs = [sorted] 

1071 if np := sys.modules.get("numpy"): 

1072 # c.f. https://numpy.org/doc/stable/reference/routines.array-creation.html 

1073 # Probably only `np.array` and `np.asarray` will be used in practice, 

1074 # but why should that stop us when we've already gone this far? 

1075 funcs += [ 

1076 np.empty_like, 

1077 np.eye, 

1078 np.identity, 

1079 np.ones_like, 

1080 np.zeros_like, 

1081 np.array, 

1082 np.asarray, 

1083 np.asanyarray, 

1084 np.ascontiguousarray, 

1085 np.asmatrix, 

1086 np.copy, 

1087 np.rec.array, 

1088 np.rec.fromarrays, 

1089 np.rec.fromrecords, 

1090 np.diag, 

1091 # bonus undocumented functions from tab-completion: 

1092 np.asarray_chkfinite, 

1093 np.asfortranarray, 

1094 ] 

1095 

1096 return funcs 

1097 

1098 

1099filter_not_satisfied = UniqueIdentifier("filter not satisfied") 

1100 

1101 

1102class FilteredStrategy(SearchStrategy[Ex]): 

1103 def __init__( 

1104 self, strategy: SearchStrategy[Ex], conditions: tuple[Callable[[Ex], Any], ...] 

1105 ): 

1106 super().__init__() 

1107 if isinstance(strategy, FilteredStrategy): 

1108 # Flatten chained filters into a single filter with multiple conditions. 

1109 self.flat_conditions: tuple[Callable[[Ex], Any], ...] = ( 

1110 strategy.flat_conditions + conditions 

1111 ) 

1112 self.filtered_strategy: SearchStrategy[Ex] = strategy.filtered_strategy 

1113 else: 

1114 self.flat_conditions = conditions 

1115 self.filtered_strategy = strategy 

1116 

1117 assert isinstance(self.flat_conditions, tuple) 

1118 assert not isinstance(self.filtered_strategy, FilteredStrategy) 

1119 

1120 self.__condition: Optional[Callable[[Ex], Any]] = None 

1121 

1122 def calc_is_empty(self, recur: RecurT) -> bool: 

1123 return recur(self.filtered_strategy) 

1124 

1125 def calc_is_cacheable(self, recur: RecurT) -> bool: 

1126 return recur(self.filtered_strategy) 

1127 

1128 def __repr__(self) -> str: 

1129 if not hasattr(self, "_cached_repr"): 

1130 self._cached_repr = "{!r}{}".format( 

1131 self.filtered_strategy, 

1132 "".join( 

1133 f".filter({get_pretty_function_description(cond)})" 

1134 for cond in self.flat_conditions 

1135 ), 

1136 ) 

1137 return self._cached_repr 

1138 

1139 def do_validate(self) -> None: 

1140 # Start by validating our inner filtered_strategy. If this was a LazyStrategy, 

1141 # validation also reifies it so that subsequent calls to e.g. `.filter()` will 

1142 # be passed through. 

1143 self.filtered_strategy.validate() 

1144 # So now we have a reified inner strategy, we'll replay all our saved 

1145 # predicates in case some or all of them can be rewritten. Note that this 

1146 # replaces the `fresh` strategy too! 

1147 fresh = self.filtered_strategy 

1148 for cond in self.flat_conditions: 

1149 fresh = fresh.filter(cond) 

1150 if isinstance(fresh, FilteredStrategy): 

1151 # In this case we have at least some non-rewritten filter predicates, 

1152 # so we just re-initialize the strategy. 

1153 FilteredStrategy.__init__( 

1154 self, fresh.filtered_strategy, fresh.flat_conditions 

1155 ) 

1156 else: 

1157 # But if *all* the predicates were rewritten... well, do_validate() is 

1158 # an in-place method so we still just re-initialize the strategy! 

1159 FilteredStrategy.__init__(self, fresh, ()) 

1160 

1161 def filter(self, condition: Callable[[Ex], Any]) -> "FilteredStrategy[Ex]": 

1162 # If we can, it's more efficient to rewrite our strategy to satisfy the 

1163 # condition. We therefore exploit the fact that the order of predicates 

1164 # doesn't matter (`f(x) and g(x) == g(x) and f(x)`) by attempting to apply 

1165 # condition directly to our filtered strategy as the inner-most filter. 

1166 out = self.filtered_strategy.filter(condition) 

1167 # If it couldn't be rewritten, we'll get a new FilteredStrategy - and then 

1168 # combine the conditions of each in our expected newest=last order. 

1169 if isinstance(out, FilteredStrategy): 

1170 return FilteredStrategy( 

1171 out.filtered_strategy, self.flat_conditions + out.flat_conditions 

1172 ) 

1173 # But if it *could* be rewritten, we can return the more efficient form! 

1174 return FilteredStrategy(out, self.flat_conditions) 

1175 

1176 @property 

1177 def condition(self) -> Callable[[Ex], Any]: 

1178 if self.__condition is None: 

1179 if len(self.flat_conditions) == 1: 

1180 # Avoid an extra indirection in the common case of only one condition. 

1181 self.__condition = self.flat_conditions[0] 

1182 elif len(self.flat_conditions) == 0: 

1183 # Possible, if unlikely, due to filter predicate rewriting 

1184 self.__condition = lambda _: True # type: ignore # covariant type param 

1185 else: 

1186 self.__condition = lambda x: all( # type: ignore # covariant type param 

1187 cond(x) for cond in self.flat_conditions 

1188 ) 

1189 return self.__condition 

1190 

1191 def do_draw(self, data: ConjectureData) -> Ex: 

1192 result = self.do_filtered_draw(data) 

1193 if result is not filter_not_satisfied: 

1194 return cast(Ex, result) 

1195 

1196 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}") 

1197 

1198 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]: 

1199 for i in range(3): 

1200 data.start_span(FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL) 

1201 value = data.draw(self.filtered_strategy) 

1202 if self.condition(value): 

1203 data.stop_span() 

1204 return value 

1205 else: 

1206 data.stop_span(discard=True) 

1207 if i == 0: 

1208 data.events[f"Retried draw from {self!r} to satisfy filter"] = "" 

1209 

1210 return filter_not_satisfied 

1211 

1212 @property 

1213 def branches(self) -> Sequence[SearchStrategy[Ex]]: 

1214 return [ 

1215 FilteredStrategy(strategy=strategy, conditions=self.flat_conditions) 

1216 for strategy in self.filtered_strategy.branches 

1217 ] 

1218 

1219 

1220@check_function 

1221def check_strategy(arg: object, name: str = "") -> None: 

1222 assert isinstance(name, str) 

1223 if not isinstance(arg, SearchStrategy): 

1224 hint = "" 

1225 if isinstance(arg, (list, tuple)): 

1226 hint = ", such as st.sampled_from({}),".format(name or "...") 

1227 if name: 

1228 name += "=" 

1229 raise InvalidArgument( 

1230 f"Expected a SearchStrategy{hint} but got {name}{arg!r} " 

1231 f"(type={type(arg).__name__})" 

1232 )