Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/hypothesis/strategies/_internal/strategies.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

472 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import sys 

12import warnings 

13from collections import abc, defaultdict 

14from collections.abc import Sequence 

15from functools import lru_cache 

16from random import shuffle 

17from typing import ( 

18 TYPE_CHECKING, 

19 Any, 

20 Callable, 

21 ClassVar, 

22 Generic, 

23 Literal, 

24 Optional, 

25 TypeVar, 

26 Union, 

27 cast, 

28 overload, 

29) 

30 

31from hypothesis._settings import HealthCheck, Phase, Verbosity, settings 

32from hypothesis.control import _current_build_context, current_build_context 

33from hypothesis.errors import ( 

34 HypothesisException, 

35 HypothesisWarning, 

36 InvalidArgument, 

37 NonInteractiveExampleWarning, 

38 UnsatisfiedAssumption, 

39) 

40from hypothesis.internal.conjecture import utils as cu 

41from hypothesis.internal.conjecture.data import ConjectureData 

42from hypothesis.internal.conjecture.utils import ( 

43 calc_label_from_cls, 

44 calc_label_from_hash, 

45 calc_label_from_name, 

46 combine_labels, 

47) 

48from hypothesis.internal.coverage import check_function 

49from hypothesis.internal.reflection import ( 

50 get_pretty_function_description, 

51 is_identity_function, 

52) 

53from hypothesis.strategies._internal.utils import defines_strategy 

54from hypothesis.utils.conventions import UniqueIdentifier 

55 

56if TYPE_CHECKING: 

57 from typing import TypeAlias 

58 

59 Ex = TypeVar("Ex", covariant=True, default=Any) 

60else: 

61 Ex = TypeVar("Ex", covariant=True) 

62 

63T = TypeVar("T") 

64T3 = TypeVar("T3") 

65T4 = TypeVar("T4") 

66T5 = TypeVar("T5") 

67MappedFrom = TypeVar("MappedFrom") 

68MappedTo = TypeVar("MappedTo") 

69RecurT: "TypeAlias" = Callable[["SearchStrategy"], bool] 

70calculating = UniqueIdentifier("calculating") 

71 

72MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name( 

73 "another attempted draw in MappedStrategy" 

74) 

75 

76FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name( 

77 "single loop iteration in FilteredStrategy" 

78) 

79 

80 

81def recursive_property(strategy: "SearchStrategy", name: str, default: object) -> Any: 

82 """Handle properties which may be mutually recursive among a set of 

83 strategies. 

84 

85 These are essentially lazily cached properties, with the ability to set 

86 an override: If the property has not been explicitly set, we calculate 

87 it on first access and memoize the result for later. 

88 

89 The problem is that for properties that depend on each other, a naive 

90 calculation strategy may hit infinite recursion. Consider for example 

91 the property is_empty. A strategy defined as x = st.deferred(lambda: x) 

92 is certainly empty (in order to draw a value from x we would have to 

93 draw a value from x, for which we would have to draw a value from x, 

94 ...), but in order to calculate it the naive approach would end up 

95 calling x.is_empty in order to calculate x.is_empty in order to etc. 

96 

97 The solution is one of fixed point calculation. We start with a default 

98 value that is the value of the property in the absence of evidence to 

99 the contrary, and then update the values of the property for all 

100 dependent strategies until we reach a fixed point. 

101 

102 The approach taken roughly follows that in section 4.2 of Adams, 

103 Michael D., Celeste Hollenbeck, and Matthew Might. "On the complexity 

104 and performance of parsing with derivatives." ACM SIGPLAN Notices 51.6 

105 (2016): 224-236. 

106 """ 

107 cache_key = "cached_" + name 

108 calculation = "calc_" + name 

109 force_key = "force_" + name 

110 

111 def forced_value(target: SearchStrategy) -> Any: 

112 try: 

113 return getattr(target, force_key) 

114 except AttributeError: 

115 return getattr(target, cache_key) 

116 

117 try: 

118 return forced_value(strategy) 

119 except AttributeError: 

120 pass 

121 

122 mapping: dict[SearchStrategy, Any] = {} 

123 sentinel = object() 

124 hit_recursion = False 

125 

126 # For a first pass we do a direct recursive calculation of the 

127 # property, but we block recursively visiting a value in the 

128 # computation of its property: When that happens, we simply 

129 # note that it happened and return the default value. 

130 def recur(strat: SearchStrategy) -> Any: 

131 nonlocal hit_recursion 

132 try: 

133 return forced_value(strat) 

134 except AttributeError: 

135 pass 

136 result = mapping.get(strat, sentinel) 

137 if result is calculating: 

138 hit_recursion = True 

139 return default 

140 elif result is sentinel: 

141 mapping[strat] = calculating 

142 mapping[strat] = getattr(strat, calculation)(recur) 

143 return mapping[strat] 

144 return result 

145 

146 recur(strategy) 

147 

148 # If we hit self-recursion in the computation of any strategy 

149 # value, our mapping at the end is imprecise - it may or may 

150 # not have the right values in it. We now need to proceed with 

151 # a more careful fixed point calculation to get the exact 

152 # values. Hopefully our mapping is still pretty good and it 

153 # won't take a large number of updates to reach a fixed point. 

154 if hit_recursion: 

155 needs_update = set(mapping) 

156 

157 # We track which strategies use which in the course of 

158 # calculating their property value. If A ever uses B in 

159 # the course of calculating its value, then whenever the 

160 # value of B changes we might need to update the value of 

161 # A. 

162 listeners: dict[SearchStrategy, set[SearchStrategy]] = defaultdict(set) 

163 else: 

164 needs_update = None 

165 

166 def recur2(strat: SearchStrategy) -> Any: 

167 def recur_inner(other: SearchStrategy) -> Any: 

168 try: 

169 return forced_value(other) 

170 except AttributeError: 

171 pass 

172 listeners[other].add(strat) 

173 result = mapping.get(other, sentinel) 

174 if result is sentinel: 

175 assert needs_update is not None 

176 needs_update.add(other) 

177 mapping[other] = default 

178 return default 

179 return result 

180 

181 return recur_inner 

182 

183 count = 0 

184 seen = set() 

185 while needs_update: 

186 count += 1 

187 # If we seem to be taking a really long time to stabilize we 

188 # start tracking seen values to attempt to detect an infinite 

189 # loop. This should be impossible, and most code will never 

190 # hit the count, but having an assertion for it means that 

191 # testing is easier to debug and we don't just have a hung 

192 # test. 

193 # Note: This is actually covered, by test_very_deep_deferral 

194 # in tests/cover/test_deferred_strategies.py. Unfortunately it 

195 # runs into a coverage bug. See 

196 # https://github.com/nedbat/coveragepy/issues/605 

197 # for details. 

198 if count > 50: # pragma: no cover 

199 key = frozenset(mapping.items()) 

200 assert key not in seen, (key, name) 

201 seen.add(key) 

202 to_update = needs_update 

203 needs_update = set() 

204 for strat in to_update: 

205 new_value = getattr(strat, calculation)(recur2(strat)) 

206 if new_value != mapping[strat]: 

207 needs_update.update(listeners[strat]) 

208 mapping[strat] = new_value 

209 

210 # We now have a complete and accurate calculation of the 

211 # property values for everything we have seen in the course of 

212 # running this calculation. We simultaneously update all of 

213 # them (not just the strategy we started out with). 

214 for k, v in mapping.items(): 

215 setattr(k, cache_key, v) 

216 return getattr(strategy, cache_key) 

217 

218 

219class SearchStrategy(Generic[Ex]): 

220 """A ``SearchStrategy`` tells Hypothesis how to generate that kind of input. 

221 

222 This class is only part of the public API for use in type annotations, so that 

223 you can write e.g. ``-> SearchStrategy[Foo]`` for your function which returns 

224 ``builds(Foo, ...)``. Do not inherit from or directly instantiate this class. 

225 """ 

226 

227 validate_called: bool = False 

228 __label: Union[int, UniqueIdentifier, None] = None 

229 __module__: str = "hypothesis.strategies" 

230 

231 def _available(self, data: ConjectureData) -> bool: 

232 """Returns whether this strategy can *currently* draw any 

233 values. This typically useful for stateful testing where ``Bundle`` 

234 grows over time a list of value to choose from. 

235 

236 Unlike ``empty`` property, this method's return value may change 

237 over time. 

238 Note: ``data`` parameter will only be used for introspection and no 

239 value drawn from it. 

240 """ 

241 return not self.is_empty 

242 

243 @property 

244 def is_empty(self) -> Any: 

245 # Returns True if this strategy can never draw a value and will always 

246 # result in the data being marked invalid. 

247 # The fact that this returns False does not guarantee that a valid value 

248 # can be drawn - this is not intended to be perfect, and is primarily 

249 # intended to be an optimisation for some cases. 

250 return recursive_property(self, "is_empty", True) 

251 

252 @property 

253 def supports_find(self) -> bool: 

254 return True 

255 

256 # Returns True if values from this strategy can safely be reused without 

257 # this causing unexpected behaviour. 

258 

259 # True if values from this strategy can be implicitly reused (e.g. as 

260 # background values in a numpy array) without causing surprising 

261 # user-visible behaviour. Should be false for built-in strategies that 

262 # produce mutable values, and for strategies that have been mapped/filtered 

263 # by arbitrary user-provided functions. 

264 @property 

265 def has_reusable_values(self) -> Any: 

266 return recursive_property(self, "has_reusable_values", True) 

267 

268 # Whether this strategy is suitable for holding onto in a cache. 

269 @property 

270 def is_cacheable(self) -> Any: 

271 return recursive_property(self, "is_cacheable", True) 

272 

273 def calc_is_cacheable(self, recur: RecurT) -> bool: 

274 return True 

275 

276 def calc_is_empty(self, recur: RecurT) -> bool: 

277 # Note: It is correct and significant that the default return value 

278 # from calc_is_empty is False despite the default value for is_empty 

279 # being true. The reason for this is that strategies should be treated 

280 # as empty absent evidence to the contrary, but most basic strategies 

281 # are trivially non-empty and it would be annoying to have to override 

282 # this method to show that. 

283 return False 

284 

285 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

286 return False 

287 

288 def example(self) -> Ex: # FIXME 

289 """Provide an example of the sort of value that this strategy generates. 

290 

291 This method is designed for use in a REPL, and will raise an error if 

292 called from inside |@given| or a strategy definition. For serious use, 

293 see |@composite| or |st.data|. 

294 """ 

295 if getattr(sys, "ps1", None) is None: # pragma: no branch 

296 # The other branch *is* covered in cover/test_examples.py; but as that 

297 # uses `pexpect` for an interactive session `coverage` doesn't see it. 

298 warnings.warn( 

299 "The `.example()` method is good for exploring strategies, but should " 

300 "only be used interactively. We recommend using `@given` for tests - " 

301 "it performs better, saves and replays failures to avoid flakiness, " 

302 f"and reports minimal examples. (strategy: {self!r})", 

303 NonInteractiveExampleWarning, 

304 stacklevel=2, 

305 ) 

306 

307 context = _current_build_context.value 

308 if context is not None: 

309 if context.data is not None and context.data.depth > 0: 

310 raise HypothesisException( 

311 "Using example() inside a strategy definition is a bad " 

312 "idea. Instead consider using hypothesis.strategies.builds() " 

313 "or @hypothesis.strategies.composite to define your strategy." 

314 " See https://hypothesis.readthedocs.io/en/latest/data.html" 

315 "#hypothesis.strategies.builds or " 

316 "https://hypothesis.readthedocs.io/en/latest/data.html" 

317 "#composite-strategies for more details." 

318 ) 

319 else: 

320 raise HypothesisException( 

321 "Using example() inside a test function is a bad " 

322 "idea. Instead consider using hypothesis.strategies.data() " 

323 "to draw more examples during testing. See " 

324 "https://hypothesis.readthedocs.io/en/latest/data.html" 

325 "#drawing-interactively-in-tests for more details." 

326 ) 

327 

328 try: 

329 return self.__examples.pop() 

330 except (AttributeError, IndexError): 

331 self.__examples: list[Ex] = [] 

332 

333 from hypothesis.core import given 

334 

335 # Note: this function has a weird name because it might appear in 

336 # tracebacks, and we want users to know that they can ignore it. 

337 @given(self) 

338 @settings( 

339 database=None, 

340 # generate only a few examples at a time to avoid slow interactivity 

341 # for large strategies. The overhead of @given is very small relative 

342 # to generation, so a small batch size is fine. 

343 max_examples=10, 

344 deadline=None, 

345 verbosity=Verbosity.quiet, 

346 phases=(Phase.generate,), 

347 suppress_health_check=list(HealthCheck), 

348 ) 

349 def example_generating_inner_function( 

350 ex: Ex, # type: ignore # mypy is overzealous in preventing covariant params 

351 ) -> None: 

352 self.__examples.append(ex) 

353 

354 example_generating_inner_function() 

355 shuffle(self.__examples) 

356 return self.__examples.pop() 

357 

358 def map(self, pack: Callable[[Ex], T]) -> "SearchStrategy[T]": 

359 """Returns a new strategy which generates a value from this one, and 

360 then returns ``pack(value)``. For example, ``integers().map(str)`` 

361 could generate ``str(5)`` == ``"5"``. 

362 """ 

363 if is_identity_function(pack): 

364 return self # type: ignore # Mypy has no way to know that `Ex == T` 

365 return MappedStrategy(self, pack=pack) 

366 

367 def flatmap( 

368 self, expand: Callable[[Ex], "SearchStrategy[T]"] 

369 ) -> "SearchStrategy[T]": # FIXME 

370 """Old syntax for a special case of |@composite|: 

371 

372 .. code-block:: python 

373 

374 @st.composite 

375 def flatmap_like(draw, base_strategy, expand): 

376 value = draw(base_strategy) 

377 new_strategy = expand(value) 

378 return draw(new_strategy) 

379 

380 We find that the greater readability of |@composite| usually outweighs 

381 the verbosity, with a few exceptions for simple cases or recipes like 

382 ``from_type(type).flatmap(from_type)`` ("pick a type, get a strategy for 

383 any instance of that type, and then generate one of those"). 

384 """ 

385 from hypothesis.strategies._internal.flatmapped import FlatMapStrategy 

386 

387 return FlatMapStrategy(self, expand=expand) 

388 

389 # Note that we previously had condition extracted to a type alias as 

390 # PredicateT. However, that was only useful when not specifying a relationship 

391 # between the generic Ts and some other function param / return value. 

392 # If we do want to - like here, where we want to say that the Ex arg to condition 

393 # is of the same type as the strategy's Ex - then you need to write out the 

394 # entire Callable[[Ex], Any] expression rather than use a type alias. 

395 # TypeAlias is *not* simply a macro that inserts the text. TypeAlias will not 

396 # reference the local TypeVar context. 

397 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]": 

398 """Returns a new strategy that generates values from this strategy 

399 which satisfy the provided condition. 

400 

401 Note that if the condition is too hard to satisfy this might result 

402 in your tests failing with an Unsatisfiable exception. 

403 A basic version of the filtering logic would look something like: 

404 

405 .. code-block:: python 

406 

407 @st.composite 

408 def filter_like(draw, strategy, condition): 

409 for _ in range(3): 

410 value = draw(strategy) 

411 if condition(value): 

412 return value 

413 assume(False) 

414 """ 

415 return FilteredStrategy(conditions=(condition,), strategy=self) 

416 

417 def _filter_for_filtered_draw( 

418 self, condition: Callable[[Ex], Any] 

419 ) -> "FilteredStrategy[Ex]": 

420 # Hook for parent strategies that want to perform fallible filtering 

421 # on one of their internal strategies (e.g. UniqueListStrategy). 

422 # The returned object must have a `.do_filtered_draw(data)` method 

423 # that behaves like `do_draw`, but returns the sentinel object 

424 # `filter_not_satisfied` if the condition could not be satisfied. 

425 

426 # This is separate from the main `filter` method so that strategies 

427 # can override `filter` without having to also guarantee a 

428 # `do_filtered_draw` method. 

429 return FilteredStrategy(conditions=(condition,), strategy=self) 

430 

431 @property 

432 def branches(self) -> Sequence["SearchStrategy[Ex]"]: 

433 return [self] 

434 

435 def __or__(self, other: "SearchStrategy[T]") -> "SearchStrategy[Union[Ex, T]]": 

436 """Return a strategy which produces values by randomly drawing from one 

437 of this strategy or the other strategy. 

438 

439 This method is part of the public API. 

440 """ 

441 if not isinstance(other, SearchStrategy): 

442 raise ValueError(f"Cannot | a SearchStrategy with {other!r}") 

443 

444 # Unwrap explicitly or'd strategies. This turns the 

445 # common case of e.g. st.integers() | st.integers() | st.integers() from 

446 # 

447 # one_of(one_of(integers(), integers()), integers()) 

448 # 

449 # into 

450 # 

451 # one_of(integers(), integers(), integers()) 

452 # 

453 # This is purely an aesthetic unwrapping, for e.g. reprs. In practice 

454 # we use .branches / .element_strategies to get the list of possible 

455 # strategies, so this unwrapping is *not* necessary for correctness. 

456 strategies: list[SearchStrategy] = [] 

457 strategies.extend( 

458 self.original_strategies if isinstance(self, OneOfStrategy) else [self] 

459 ) 

460 strategies.extend( 

461 other.original_strategies if isinstance(other, OneOfStrategy) else [other] 

462 ) 

463 return OneOfStrategy(strategies) 

464 

465 def __bool__(self) -> bool: 

466 warnings.warn( 

467 f"bool({self!r}) is always True, did you mean to draw a value?", 

468 HypothesisWarning, 

469 stacklevel=2, 

470 ) 

471 return True 

472 

473 def validate(self) -> None: 

474 """Throw an exception if the strategy is not valid. 

475 

476 This can happen due to lazy construction 

477 """ 

478 if self.validate_called: 

479 return 

480 try: 

481 self.validate_called = True 

482 self.do_validate() 

483 self.is_empty 

484 self.has_reusable_values 

485 except Exception: 

486 self.validate_called = False 

487 raise 

488 

489 LABELS: ClassVar[dict[type, int]] = {} 

490 

491 @property 

492 def class_label(self) -> int: 

493 cls = self.__class__ 

494 try: 

495 return cls.LABELS[cls] 

496 except KeyError: 

497 pass 

498 result = calc_label_from_cls(cls) 

499 cls.LABELS[cls] = result 

500 return result 

501 

502 @property 

503 def label(self) -> int: 

504 if self.__label is calculating: 

505 return 0 

506 if self.__label is None: 

507 self.__label = calculating 

508 self.__label = self.calc_label() 

509 return cast(int, self.__label) 

510 

511 def calc_label(self) -> int: 

512 return self.class_label 

513 

514 def do_validate(self) -> None: 

515 pass 

516 

517 def do_draw(self, data: ConjectureData) -> Ex: 

518 raise NotImplementedError(f"{type(self).__name__}.do_draw") 

519 

520 

521def is_hashable(value: object) -> bool: 

522 try: 

523 hash(value) 

524 return True 

525 except TypeError: 

526 return False 

527 

528 

529class SampledFromStrategy(SearchStrategy[Ex]): 

530 """A strategy which samples from a set of elements. This is essentially 

531 equivalent to using a OneOfStrategy over Just strategies but may be more 

532 efficient and convenient. 

533 """ 

534 

535 _MAX_FILTER_CALLS: ClassVar[int] = 10_000 

536 

537 def __init__( 

538 self, 

539 elements: Sequence[Ex], 

540 repr_: Optional[str] = None, 

541 transformations: tuple[ 

542 tuple[Literal["filter", "map"], Callable[[Ex], Any]], 

543 ..., 

544 ] = (), 

545 ): 

546 super().__init__() 

547 self.elements = cu.check_sample(elements, "sampled_from") 

548 assert self.elements 

549 self.repr_ = repr_ 

550 self._transformations = transformations 

551 

552 def map(self, pack: Callable[[Ex], T]) -> SearchStrategy[T]: 

553 s = type(self)( 

554 self.elements, 

555 repr_=self.repr_, 

556 transformations=(*self._transformations, ("map", pack)), 

557 ) 

558 # guaranteed by the ("map", pack) transformation 

559 return cast(SearchStrategy[T], s) 

560 

561 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]: 

562 return type(self)( 

563 self.elements, 

564 repr_=self.repr_, 

565 transformations=(*self._transformations, ("filter", condition)), 

566 ) 

567 

568 def __repr__(self) -> str: 

569 return ( 

570 self.repr_ 

571 or "sampled_from([" 

572 + ", ".join(map(get_pretty_function_description, self.elements)) 

573 + "])" 

574 ) + "".join( 

575 f".{name}({get_pretty_function_description(f)})" 

576 for name, f in self._transformations 

577 ) 

578 

579 def calc_label(self) -> int: 

580 # strategy.label is effectively an under-approximation of structural 

581 # equality (i.e., some strategies may have the same label when they are not 

582 # structurally identical). More importantly for calculating the 

583 # SampledFromStrategy label, we might have hash(s1) != hash(s2) even 

584 # when s1 and s2 are structurally identical. For instance: 

585 # 

586 # s1 = st.sampled_from([st.none()]) 

587 # s2 = st.sampled_from([st.none()]) 

588 # assert hash(s1) != hash(s2) 

589 # 

590 # (see also test cases in test_labels.py). 

591 # 

592 # We therefore use the labels of any component strategies when calculating 

593 # our label, and only use the hash if it is not a strategy. 

594 # 

595 # That's the ideal, anyway. In reality the logic is more complicated than 

596 # necessary in order to be efficient in the presence of (very) large sequences: 

597 # * add an unabashed special case for range, to avoid iteration over an 

598 # enormous range when we know it is entirely integers. 

599 # * if there is at least one strategy in self.elements, use strategy label, 

600 # and the element hash otherwise. 

601 # * if there are no strategies in self.elements, take the hash of the 

602 # entire sequence. This prevents worst-case performance of hashing each 

603 # element when a hash of the entire sequence would have sufficed. 

604 # 

605 # The worst case performance of this scheme is 

606 # itertools.chain(range(2**100), [st.none()]), where it degrades to 

607 # hashing every int in the range. 

608 

609 if isinstance(self.elements, range) or ( 

610 is_hashable(self.elements) 

611 and not any(isinstance(e, SearchStrategy) for e in self.elements) 

612 ): 

613 return combine_labels(self.class_label, calc_label_from_hash(self.elements)) 

614 

615 labels = [self.class_label] 

616 for element in self.elements: 

617 if not is_hashable(element): 

618 continue 

619 

620 labels.append( 

621 element.label 

622 if isinstance(element, SearchStrategy) 

623 else calc_label_from_hash(element) 

624 ) 

625 

626 return combine_labels(*labels) 

627 

628 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

629 # Because our custom .map/.filter implementations skip the normal 

630 # wrapper strategies (which would automatically return False for us), 

631 # we need to manually return False here if any transformations have 

632 # been applied. 

633 return not self._transformations 

634 

635 def calc_is_cacheable(self, recur: RecurT) -> bool: 

636 return is_hashable(self.elements) 

637 

638 def _transform( 

639 self, 

640 # https://github.com/python/mypy/issues/7049, we're not writing `element` 

641 # anywhere in the class so this is still type-safe. mypy is being more 

642 # conservative than necessary 

643 element: Ex, # type: ignore 

644 ) -> Union[Ex, UniqueIdentifier]: 

645 # Used in UniqueSampledListStrategy 

646 for name, f in self._transformations: 

647 if name == "map": 

648 result = f(element) 

649 if build_context := _current_build_context.value: 

650 build_context.record_call(result, f, [element], {}) 

651 element = result 

652 else: 

653 assert name == "filter" 

654 if not f(element): 

655 return filter_not_satisfied 

656 return element 

657 

658 def do_draw(self, data: ConjectureData) -> Ex: 

659 result = self.do_filtered_draw(data) 

660 if isinstance(result, SearchStrategy) and all( 

661 isinstance(x, SearchStrategy) for x in self.elements 

662 ): 

663 data._sampled_from_all_strategies_elements_message = ( 

664 "sample_from was given a collection of strategies: " 

665 "{!r}. Was one_of intended?", 

666 self.elements, 

667 ) 

668 if result is filter_not_satisfied: 

669 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}") 

670 assert not isinstance(result, UniqueIdentifier) 

671 return result 

672 

673 def get_element(self, i: int) -> Union[Ex, UniqueIdentifier]: 

674 return self._transform(self.elements[i]) 

675 

676 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]: 

677 # Set of indices that have been tried so far, so that we never test 

678 # the same element twice during a draw. 

679 known_bad_indices: set[int] = set() 

680 

681 # Start with ordinary rejection sampling. It's fast if it works, and 

682 # if it doesn't work then it was only a small amount of overhead. 

683 for _ in range(3): 

684 i = data.draw_integer(0, len(self.elements) - 1) 

685 if i not in known_bad_indices: 

686 element = self.get_element(i) 

687 if element is not filter_not_satisfied: 

688 return element 

689 if not known_bad_indices: 

690 data.events[f"Retried draw from {self!r} to satisfy filter"] = "" 

691 known_bad_indices.add(i) 

692 

693 # If we've tried all the possible elements, give up now. 

694 max_good_indices = len(self.elements) - len(known_bad_indices) 

695 if not max_good_indices: 

696 return filter_not_satisfied 

697 

698 # Impose an arbitrary cutoff to prevent us from wasting too much time 

699 # on very large element lists. 

700 max_good_indices = min(max_good_indices, self._MAX_FILTER_CALLS - 3) 

701 

702 # Before building the list of allowed indices, speculatively choose 

703 # one of them. We don't yet know how many allowed indices there will be, 

704 # so this choice might be out-of-bounds, but that's OK. 

705 speculative_index = data.draw_integer(0, max_good_indices - 1) 

706 

707 # Calculate the indices of allowed values, so that we can choose one 

708 # of them at random. But if we encounter the speculatively-chosen one, 

709 # just use that and return immediately. Note that we also track the 

710 # allowed elements, in case of .map(some_stateful_function) 

711 allowed: list[tuple[int, Ex]] = [] 

712 for i in range(min(len(self.elements), self._MAX_FILTER_CALLS - 3)): 

713 if i not in known_bad_indices: 

714 element = self.get_element(i) 

715 if element is not filter_not_satisfied: 

716 assert not isinstance(element, UniqueIdentifier) 

717 allowed.append((i, element)) 

718 if len(allowed) > speculative_index: 

719 # Early-exit case: We reached the speculative index, so 

720 # we just return the corresponding element. 

721 data.draw_integer(0, len(self.elements) - 1, forced=i) 

722 return element 

723 

724 # The speculative index didn't work out, but at this point we've built 

725 # and can choose from the complete list of allowed indices and elements. 

726 if allowed: 

727 i, element = data.choice(allowed) 

728 data.draw_integer(0, len(self.elements) - 1, forced=i) 

729 return element 

730 # If there are no allowed indices, the filter couldn't be satisfied. 

731 return filter_not_satisfied 

732 

733 

734class OneOfStrategy(SearchStrategy[Ex]): 

735 """Implements a union of strategies. Given a number of strategies this 

736 generates values which could have come from any of them. 

737 

738 The conditional distribution draws uniformly at random from some 

739 non-empty subset of these strategies and then draws from the 

740 conditional distribution of that strategy. 

741 """ 

742 

743 def __init__(self, strategies: Sequence[SearchStrategy[Ex]]): 

744 super().__init__() 

745 self.original_strategies = tuple(strategies) 

746 self.__element_strategies: Optional[Sequence[SearchStrategy[Ex]]] = None 

747 self.__in_branches = False 

748 

749 def calc_is_empty(self, recur: RecurT) -> bool: 

750 return all(recur(e) for e in self.original_strategies) 

751 

752 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

753 return all(recur(e) for e in self.original_strategies) 

754 

755 def calc_is_cacheable(self, recur: RecurT) -> bool: 

756 return all(recur(e) for e in self.original_strategies) 

757 

758 @property 

759 def element_strategies(self) -> Sequence[SearchStrategy[Ex]]: 

760 if self.__element_strategies is None: 

761 # While strategies are hashable, they use object.__hash__ and are 

762 # therefore distinguished only by identity. 

763 # 

764 # In principle we could "just" define a __hash__ method 

765 # (and __eq__, but that's easy in terms of type() and hash()) 

766 # to make this more powerful, but this is harder than it sounds: 

767 # 

768 # 1. Strategies are often distinguished by non-hashable attributes, 

769 # or by attributes that have the same hash value ("^.+" / b"^.+"). 

770 # 2. LazyStrategy: can't reify the wrapped strategy without breaking 

771 # laziness, so there's a hash each for the lazy and the nonlazy. 

772 # 

773 # Having made several attempts, the minor benefits of making strategies 

774 # hashable are simply not worth the engineering effort it would take. 

775 # See also issues #2291 and #2327. 

776 seen: set[SearchStrategy] = {self} 

777 strategies: list[SearchStrategy] = [] 

778 for arg in self.original_strategies: 

779 check_strategy(arg) 

780 if not arg.is_empty: 

781 for s in arg.branches: 

782 if s not in seen and not s.is_empty: 

783 seen.add(s) 

784 strategies.append(s) 

785 self.__element_strategies = strategies 

786 return self.__element_strategies 

787 

788 def calc_label(self) -> int: 

789 return combine_labels( 

790 self.class_label, *(p.label for p in self.original_strategies) 

791 ) 

792 

793 def do_draw(self, data: ConjectureData) -> Ex: 

794 strategy = data.draw( 

795 SampledFromStrategy(self.element_strategies).filter( 

796 lambda s: s._available(data) 

797 ) 

798 ) 

799 return data.draw(strategy) 

800 

801 def __repr__(self) -> str: 

802 return "one_of({})".format(", ".join(map(repr, self.original_strategies))) 

803 

804 def do_validate(self) -> None: 

805 for e in self.element_strategies: 

806 e.validate() 

807 

808 @property 

809 def branches(self) -> Sequence[SearchStrategy[Ex]]: 

810 if not self.__in_branches: 

811 try: 

812 self.__in_branches = True 

813 return self.element_strategies 

814 finally: 

815 self.__in_branches = False 

816 else: 

817 return [self] 

818 

819 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]: 

820 return FilteredStrategy( 

821 OneOfStrategy([s.filter(condition) for s in self.original_strategies]), 

822 conditions=(), 

823 ) 

824 

825 

826@overload 

827def one_of( 

828 __args: Sequence[SearchStrategy[Ex]], 

829) -> SearchStrategy[Ex]: # pragma: no cover 

830 ... 

831 

832 

833@overload 

834def one_of(__a1: SearchStrategy[Ex]) -> SearchStrategy[Ex]: # pragma: no cover 

835 ... 

836 

837 

838@overload 

839def one_of( 

840 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T] 

841) -> SearchStrategy[Union[Ex, T]]: # pragma: no cover 

842 ... 

843 

844 

845@overload 

846def one_of( 

847 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T], __a3: SearchStrategy[T3] 

848) -> SearchStrategy[Union[Ex, T, T3]]: # pragma: no cover 

849 ... 

850 

851 

852@overload 

853def one_of( 

854 __a1: SearchStrategy[Ex], 

855 __a2: SearchStrategy[T], 

856 __a3: SearchStrategy[T3], 

857 __a4: SearchStrategy[T4], 

858) -> SearchStrategy[Union[Ex, T, T3, T4]]: # pragma: no cover 

859 ... 

860 

861 

862@overload 

863def one_of( 

864 __a1: SearchStrategy[Ex], 

865 __a2: SearchStrategy[T], 

866 __a3: SearchStrategy[T3], 

867 __a4: SearchStrategy[T4], 

868 __a5: SearchStrategy[T5], 

869) -> SearchStrategy[Union[Ex, T, T3, T4, T5]]: # pragma: no cover 

870 ... 

871 

872 

873@overload 

874def one_of(*args: SearchStrategy[Any]) -> SearchStrategy[Any]: # pragma: no cover 

875 ... 

876 

877 

878@defines_strategy(never_lazy=True) 

879def one_of( 

880 *args: Union[Sequence[SearchStrategy[Any]], SearchStrategy[Any]] 

881) -> SearchStrategy[Any]: 

882 # Mypy workaround alert: Any is too loose above; the return parameter 

883 # should be the union of the input parameters. Unfortunately, Mypy <=0.600 

884 # raises errors due to incompatible inputs instead. See #1270 for links. 

885 # v0.610 doesn't error; it gets inference wrong for 2+ arguments instead. 

886 """Return a strategy which generates values from any of the argument 

887 strategies. 

888 

889 This may be called with one iterable argument instead of multiple 

890 strategy arguments, in which case ``one_of(x)`` and ``one_of(*x)`` are 

891 equivalent. 

892 

893 Examples from this strategy will generally shrink to ones that come from 

894 strategies earlier in the list, then shrink according to behaviour of the 

895 strategy that produced them. In order to get good shrinking behaviour, 

896 try to put simpler strategies first. e.g. ``one_of(none(), text())`` is 

897 better than ``one_of(text(), none())``. 

898 

899 This is especially important when using recursive strategies. e.g. 

900 ``x = st.deferred(lambda: st.none() | st.tuples(x, x))`` will shrink well, 

901 but ``x = st.deferred(lambda: st.tuples(x, x) | st.none())`` will shrink 

902 very badly indeed. 

903 """ 

904 if len(args) == 1 and not isinstance(args[0], SearchStrategy): 

905 try: 

906 args = tuple(args[0]) 

907 except TypeError: 

908 pass 

909 if len(args) == 1 and isinstance(args[0], SearchStrategy): 

910 # This special-case means that we can one_of over lists of any size 

911 # without incurring any performance overhead when there is only one 

912 # strategy, and keeps our reprs simple. 

913 return args[0] 

914 if args and not any(isinstance(a, SearchStrategy) for a in args): 

915 # And this special case is to give a more-specific error message if it 

916 # seems that the user has confused `one_of()` for `sampled_from()`; 

917 # the remaining validation is left to OneOfStrategy. See PR #2627. 

918 raise InvalidArgument( 

919 f"Did you mean st.sampled_from({list(args)!r})? st.one_of() is used " 

920 "to combine strategies, but all of the arguments were of other types." 

921 ) 

922 # we've handled the case where args is a one-element sequence [(s1, s2, ...)] 

923 # above, so we can assume it's an actual sequence of strategies. 

924 args = cast(Sequence[SearchStrategy], args) 

925 return OneOfStrategy(args) 

926 

927 

928class MappedStrategy(SearchStrategy[MappedTo], Generic[MappedFrom, MappedTo]): 

929 """A strategy which is defined purely by conversion to and from another 

930 strategy. 

931 

932 Its parameter and distribution come from that other strategy. 

933 """ 

934 

935 def __init__( 

936 self, 

937 strategy: SearchStrategy[MappedFrom], 

938 pack: Callable[[MappedFrom], MappedTo], 

939 ) -> None: 

940 super().__init__() 

941 self.mapped_strategy = strategy 

942 self.pack = pack 

943 

944 def calc_is_empty(self, recur: RecurT) -> bool: 

945 return recur(self.mapped_strategy) 

946 

947 def calc_is_cacheable(self, recur: RecurT) -> bool: 

948 return recur(self.mapped_strategy) 

949 

950 def __repr__(self) -> str: 

951 if not hasattr(self, "_cached_repr"): 

952 self._cached_repr = f"{self.mapped_strategy!r}.map({get_pretty_function_description(self.pack)})" 

953 return self._cached_repr 

954 

955 def do_validate(self) -> None: 

956 self.mapped_strategy.validate() 

957 

958 def do_draw(self, data: ConjectureData) -> MappedTo: 

959 with warnings.catch_warnings(): 

960 if isinstance(self.pack, type) and issubclass( 

961 self.pack, (abc.Mapping, abc.Set) 

962 ): 

963 warnings.simplefilter("ignore", BytesWarning) 

964 for _ in range(3): 

965 try: 

966 data.start_span(MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL) 

967 x = data.draw(self.mapped_strategy) 

968 result = self.pack(x) 

969 data.stop_span() 

970 current_build_context().record_call(result, self.pack, [x], {}) 

971 return result 

972 except UnsatisfiedAssumption: 

973 data.stop_span(discard=True) 

974 raise UnsatisfiedAssumption 

975 

976 @property 

977 def branches(self) -> Sequence[SearchStrategy[MappedTo]]: 

978 return [ 

979 MappedStrategy(strategy, pack=self.pack) 

980 for strategy in self.mapped_strategy.branches 

981 ] 

982 

983 def filter( 

984 self, condition: Callable[[MappedTo], Any] 

985 ) -> "SearchStrategy[MappedTo]": 

986 # Includes a special case so that we can rewrite filters on collection 

987 # lengths, when most collections are `st.lists(...).map(the_type)`. 

988 ListStrategy = _list_strategy_type() 

989 if not isinstance(self.mapped_strategy, ListStrategy) or not ( 

990 (isinstance(self.pack, type) and issubclass(self.pack, abc.Collection)) 

991 or self.pack in _collection_ish_functions() 

992 ): 

993 return super().filter(condition) 

994 

995 # Check whether our inner list strategy can rewrite this filter condition. 

996 # If not, discard the result and _only_ apply a new outer filter. 

997 new = ListStrategy.filter(self.mapped_strategy, condition) 

998 if getattr(new, "filtered_strategy", None) is self.mapped_strategy: 

999 return super().filter(condition) # didn't rewrite 

1000 

1001 # Apply a new outer filter even though we rewrote the inner strategy, 

1002 # because some collections can change the list length (dict, set, etc). 

1003 return FilteredStrategy(type(self)(new, self.pack), conditions=(condition,)) 

1004 

1005 

1006@lru_cache 

1007def _list_strategy_type() -> Any: 

1008 from hypothesis.strategies._internal.collections import ListStrategy 

1009 

1010 return ListStrategy 

1011 

1012 

1013def _collection_ish_functions() -> Sequence[Any]: 

1014 funcs = [sorted] 

1015 if np := sys.modules.get("numpy"): 

1016 # c.f. https://numpy.org/doc/stable/reference/routines.array-creation.html 

1017 # Probably only `np.array` and `np.asarray` will be used in practice, 

1018 # but why should that stop us when we've already gone this far? 

1019 funcs += [ 

1020 np.empty_like, 

1021 np.eye, 

1022 np.identity, 

1023 np.ones_like, 

1024 np.zeros_like, 

1025 np.array, 

1026 np.asarray, 

1027 np.asanyarray, 

1028 np.ascontiguousarray, 

1029 np.asmatrix, 

1030 np.copy, 

1031 np.rec.array, 

1032 np.rec.fromarrays, 

1033 np.rec.fromrecords, 

1034 np.diag, 

1035 # bonus undocumented functions from tab-completion: 

1036 np.asarray_chkfinite, 

1037 np.asfortranarray, 

1038 ] 

1039 

1040 return funcs 

1041 

1042 

1043filter_not_satisfied = UniqueIdentifier("filter not satisfied") 

1044 

1045 

1046class FilteredStrategy(SearchStrategy[Ex]): 

1047 def __init__( 

1048 self, strategy: SearchStrategy[Ex], conditions: tuple[Callable[[Ex], Any], ...] 

1049 ): 

1050 super().__init__() 

1051 if isinstance(strategy, FilteredStrategy): 

1052 # Flatten chained filters into a single filter with multiple conditions. 

1053 self.flat_conditions: tuple[Callable[[Ex], Any], ...] = ( 

1054 strategy.flat_conditions + conditions 

1055 ) 

1056 self.filtered_strategy: SearchStrategy[Ex] = strategy.filtered_strategy 

1057 else: 

1058 self.flat_conditions = conditions 

1059 self.filtered_strategy = strategy 

1060 

1061 assert isinstance(self.flat_conditions, tuple) 

1062 assert not isinstance(self.filtered_strategy, FilteredStrategy) 

1063 

1064 self.__condition: Optional[Callable[[Ex], Any]] = None 

1065 

1066 def calc_is_empty(self, recur: RecurT) -> bool: 

1067 return recur(self.filtered_strategy) 

1068 

1069 def calc_is_cacheable(self, recur: RecurT) -> bool: 

1070 return recur(self.filtered_strategy) 

1071 

1072 def __repr__(self) -> str: 

1073 if not hasattr(self, "_cached_repr"): 

1074 self._cached_repr = "{!r}{}".format( 

1075 self.filtered_strategy, 

1076 "".join( 

1077 f".filter({get_pretty_function_description(cond)})" 

1078 for cond in self.flat_conditions 

1079 ), 

1080 ) 

1081 return self._cached_repr 

1082 

1083 def do_validate(self) -> None: 

1084 # Start by validating our inner filtered_strategy. If this was a LazyStrategy, 

1085 # validation also reifies it so that subsequent calls to e.g. `.filter()` will 

1086 # be passed through. 

1087 self.filtered_strategy.validate() 

1088 # So now we have a reified inner strategy, we'll replay all our saved 

1089 # predicates in case some or all of them can be rewritten. Note that this 

1090 # replaces the `fresh` strategy too! 

1091 fresh = self.filtered_strategy 

1092 for cond in self.flat_conditions: 

1093 fresh = fresh.filter(cond) 

1094 if isinstance(fresh, FilteredStrategy): 

1095 # In this case we have at least some non-rewritten filter predicates, 

1096 # so we just re-initialize the strategy. 

1097 FilteredStrategy.__init__( 

1098 self, fresh.filtered_strategy, fresh.flat_conditions 

1099 ) 

1100 else: 

1101 # But if *all* the predicates were rewritten... well, do_validate() is 

1102 # an in-place method so we still just re-initialize the strategy! 

1103 FilteredStrategy.__init__(self, fresh, ()) 

1104 

1105 def filter(self, condition: Callable[[Ex], Any]) -> "FilteredStrategy[Ex]": 

1106 # If we can, it's more efficient to rewrite our strategy to satisfy the 

1107 # condition. We therefore exploit the fact that the order of predicates 

1108 # doesn't matter (`f(x) and g(x) == g(x) and f(x)`) by attempting to apply 

1109 # condition directly to our filtered strategy as the inner-most filter. 

1110 out = self.filtered_strategy.filter(condition) 

1111 # If it couldn't be rewritten, we'll get a new FilteredStrategy - and then 

1112 # combine the conditions of each in our expected newest=last order. 

1113 if isinstance(out, FilteredStrategy): 

1114 return FilteredStrategy( 

1115 out.filtered_strategy, self.flat_conditions + out.flat_conditions 

1116 ) 

1117 # But if it *could* be rewritten, we can return the more efficient form! 

1118 return FilteredStrategy(out, self.flat_conditions) 

1119 

1120 @property 

1121 def condition(self) -> Callable[[Ex], Any]: 

1122 if self.__condition is None: 

1123 if len(self.flat_conditions) == 1: 

1124 # Avoid an extra indirection in the common case of only one condition. 

1125 self.__condition = self.flat_conditions[0] 

1126 elif len(self.flat_conditions) == 0: 

1127 # Possible, if unlikely, due to filter predicate rewriting 

1128 self.__condition = lambda _: True # type: ignore # covariant type param 

1129 else: 

1130 self.__condition = lambda x: all( # type: ignore # covariant type param 

1131 cond(x) for cond in self.flat_conditions 

1132 ) 

1133 return self.__condition 

1134 

1135 def do_draw(self, data: ConjectureData) -> Ex: 

1136 result = self.do_filtered_draw(data) 

1137 if result is not filter_not_satisfied: 

1138 return cast(Ex, result) 

1139 

1140 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}") 

1141 

1142 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]: 

1143 for i in range(3): 

1144 data.start_span(FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL) 

1145 value = data.draw(self.filtered_strategy) 

1146 if self.condition(value): 

1147 data.stop_span() 

1148 return value 

1149 else: 

1150 data.stop_span(discard=True) 

1151 if i == 0: 

1152 data.events[f"Retried draw from {self!r} to satisfy filter"] = "" 

1153 

1154 return filter_not_satisfied 

1155 

1156 @property 

1157 def branches(self) -> Sequence[SearchStrategy[Ex]]: 

1158 return [ 

1159 FilteredStrategy(strategy=strategy, conditions=self.flat_conditions) 

1160 for strategy in self.filtered_strategy.branches 

1161 ] 

1162 

1163 

1164@check_function 

1165def check_strategy(arg: object, name: str = "") -> None: 

1166 assert isinstance(name, str) 

1167 if not isinstance(arg, SearchStrategy): 

1168 hint = "" 

1169 if isinstance(arg, (list, tuple)): 

1170 hint = ", such as st.sampled_from({}),".format(name or "...") 

1171 if name: 

1172 name += "=" 

1173 raise InvalidArgument( 

1174 f"Expected a SearchStrategy{hint} but got {name}{arg!r} " 

1175 f"(type={type(arg).__name__})" 

1176 )