Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/hypothesis/strategies/_internal/strategies.py: 64%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

453 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import sys 

12import warnings 

13from collections import abc, defaultdict 

14from functools import lru_cache 

15from random import shuffle 

16from typing import ( 

17 TYPE_CHECKING, 

18 Any, 

19 Callable, 

20 ClassVar, 

21 Dict, 

22 Generic, 

23 List, 

24 Sequence, 

25 TypeVar, 

26 Union, 

27 cast, 

28 overload, 

29) 

30 

31from hypothesis._settings import HealthCheck, Phase, Verbosity, settings 

32from hypothesis.control import _current_build_context 

33from hypothesis.errors import ( 

34 HypothesisException, 

35 HypothesisWarning, 

36 InvalidArgument, 

37 NonInteractiveExampleWarning, 

38 UnsatisfiedAssumption, 

39) 

40from hypothesis.internal.conjecture import utils as cu 

41from hypothesis.internal.conjecture.data import ConjectureData 

42from hypothesis.internal.conjecture.utils import ( 

43 calc_label_from_cls, 

44 calc_label_from_name, 

45 combine_labels, 

46) 

47from hypothesis.internal.coverage import check_function 

48from hypothesis.internal.reflection import ( 

49 get_pretty_function_description, 

50 is_identity_function, 

51) 

52from hypothesis.strategies._internal.utils import defines_strategy 

53from hypothesis.utils.conventions import UniqueIdentifier 

54 

55# TODO: Use `(3, 13)` once Python 3.13 is released. 

56if sys.version_info >= (3, 13, 0, "final"): 

57 Ex = TypeVar("Ex", covariant=True, default=Any) 

58elif TYPE_CHECKING: 

59 from typing_extensions import TypeVar # type: ignore[assignment] 

60 

61 Ex = TypeVar("Ex", covariant=True, default=Any) # type: ignore[call-arg,misc] 

62else: 

63 Ex = TypeVar("Ex", covariant=True) 

64 

65Ex_Inv = TypeVar("Ex_Inv") 

66T = TypeVar("T") 

67T3 = TypeVar("T3") 

68T4 = TypeVar("T4") 

69T5 = TypeVar("T5") 

70 

71calculating = UniqueIdentifier("calculating") 

72 

73MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name( 

74 "another attempted draw in MappedStrategy" 

75) 

76 

77FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name( 

78 "single loop iteration in FilteredStrategy" 

79) 

80 

81 

82def recursive_property(name, default): 

83 """Handle properties which may be mutually recursive among a set of 

84 strategies. 

85 

86 These are essentially lazily cached properties, with the ability to set 

87 an override: If the property has not been explicitly set, we calculate 

88 it on first access and memoize the result for later. 

89 

90 The problem is that for properties that depend on each other, a naive 

91 calculation strategy may hit infinite recursion. Consider for example 

92 the property is_empty. A strategy defined as x = st.deferred(lambda: x) 

93 is certainly empty (in order to draw a value from x we would have to 

94 draw a value from x, for which we would have to draw a value from x, 

95 ...), but in order to calculate it the naive approach would end up 

96 calling x.is_empty in order to calculate x.is_empty in order to etc. 

97 

98 The solution is one of fixed point calculation. We start with a default 

99 value that is the value of the property in the absence of evidence to 

100 the contrary, and then update the values of the property for all 

101 dependent strategies until we reach a fixed point. 

102 

103 The approach taken roughly follows that in section 4.2 of Adams, 

104 Michael D., Celeste Hollenbeck, and Matthew Might. "On the complexity 

105 and performance of parsing with derivatives." ACM SIGPLAN Notices 51.6 

106 (2016): 224-236. 

107 """ 

108 cache_key = "cached_" + name 

109 calculation = "calc_" + name 

110 force_key = "force_" + name 

111 

112 def forced_value(target): 

113 try: 

114 return getattr(target, force_key) 

115 except AttributeError: 

116 return getattr(target, cache_key) 

117 

118 def accept(self): 

119 try: 

120 return forced_value(self) 

121 except AttributeError: 

122 pass 

123 

124 mapping = {} 

125 sentinel = object() 

126 hit_recursion = [False] 

127 

128 # For a first pass we do a direct recursive calculation of the 

129 # property, but we block recursively visiting a value in the 

130 # computation of its property: When that happens, we simply 

131 # note that it happened and return the default value. 

132 def recur(strat): 

133 try: 

134 return forced_value(strat) 

135 except AttributeError: 

136 pass 

137 result = mapping.get(strat, sentinel) 

138 if result is calculating: 

139 hit_recursion[0] = True 

140 return default 

141 elif result is sentinel: 

142 mapping[strat] = calculating 

143 mapping[strat] = getattr(strat, calculation)(recur) 

144 return mapping[strat] 

145 return result 

146 

147 recur(self) 

148 

149 # If we hit self-recursion in the computation of any strategy 

150 # value, our mapping at the end is imprecise - it may or may 

151 # not have the right values in it. We now need to proceed with 

152 # a more careful fixed point calculation to get the exact 

153 # values. Hopefully our mapping is still pretty good and it 

154 # won't take a large number of updates to reach a fixed point. 

155 if hit_recursion[0]: 

156 needs_update = set(mapping) 

157 

158 # We track which strategies use which in the course of 

159 # calculating their property value. If A ever uses B in 

160 # the course of calculating its value, then whenever the 

161 # value of B changes we might need to update the value of 

162 # A. 

163 listeners = defaultdict(set) 

164 else: 

165 needs_update = None 

166 

167 def recur2(strat): 

168 def recur_inner(other): 

169 try: 

170 return forced_value(other) 

171 except AttributeError: 

172 pass 

173 listeners[other].add(strat) 

174 result = mapping.get(other, sentinel) 

175 if result is sentinel: 

176 needs_update.add(other) 

177 mapping[other] = default 

178 return default 

179 return result 

180 

181 return recur_inner 

182 

183 count = 0 

184 seen = set() 

185 while needs_update: 

186 count += 1 

187 # If we seem to be taking a really long time to stabilize we 

188 # start tracking seen values to attempt to detect an infinite 

189 # loop. This should be impossible, and most code will never 

190 # hit the count, but having an assertion for it means that 

191 # testing is easier to debug and we don't just have a hung 

192 # test. 

193 # Note: This is actually covered, by test_very_deep_deferral 

194 # in tests/cover/test_deferred_strategies.py. Unfortunately it 

195 # runs into a coverage bug. See 

196 # https://github.com/nedbat/coveragepy/issues/605 

197 # for details. 

198 if count > 50: # pragma: no cover 

199 key = frozenset(mapping.items()) 

200 assert key not in seen, (key, name) 

201 seen.add(key) 

202 to_update = needs_update 

203 needs_update = set() 

204 for strat in to_update: 

205 new_value = getattr(strat, calculation)(recur2(strat)) 

206 if new_value != mapping[strat]: 

207 needs_update.update(listeners[strat]) 

208 mapping[strat] = new_value 

209 

210 # We now have a complete and accurate calculation of the 

211 # property values for everything we have seen in the course of 

212 # running this calculation. We simultaneously update all of 

213 # them (not just the strategy we started out with). 

214 for k, v in mapping.items(): 

215 setattr(k, cache_key, v) 

216 return getattr(self, cache_key) 

217 

218 accept.__name__ = name 

219 return property(accept) 

220 

221 

222class SearchStrategy(Generic[Ex]): 

223 """A SearchStrategy is an object that knows how to explore data of a given 

224 type. 

225 

226 Except where noted otherwise, methods on this class are not part of 

227 the public API and their behaviour may change significantly between 

228 minor version releases. They will generally be stable between patch 

229 releases. 

230 """ 

231 

232 supports_find = True 

233 validate_called = False 

234 __label = None 

235 __module__ = "hypothesis.strategies" 

236 

237 def available(self, data): 

238 """Returns whether this strategy can *currently* draw any 

239 values. This typically useful for stateful testing where ``Bundle`` 

240 grows over time a list of value to choose from. 

241 

242 Unlike ``empty`` property, this method's return value may change 

243 over time. 

244 Note: ``data`` parameter will only be used for introspection and no 

245 value drawn from it. 

246 """ 

247 return not self.is_empty 

248 

249 # Returns True if this strategy can never draw a value and will always 

250 # result in the data being marked invalid. 

251 # The fact that this returns False does not guarantee that a valid value 

252 # can be drawn - this is not intended to be perfect, and is primarily 

253 # intended to be an optimisation for some cases. 

254 is_empty = recursive_property("is_empty", True) 

255 

256 # Returns True if values from this strategy can safely be reused without 

257 # this causing unexpected behaviour. 

258 

259 # True if values from this strategy can be implicitly reused (e.g. as 

260 # background values in a numpy array) without causing surprising 

261 # user-visible behaviour. Should be false for built-in strategies that 

262 # produce mutable values, and for strategies that have been mapped/filtered 

263 # by arbitrary user-provided functions. 

264 has_reusable_values = recursive_property("has_reusable_values", True) 

265 

266 # Whether this strategy is suitable for holding onto in a cache. 

267 is_cacheable = recursive_property("is_cacheable", True) 

268 

269 def calc_is_cacheable(self, recur): 

270 return True 

271 

272 def calc_is_empty(self, recur): 

273 # Note: It is correct and significant that the default return value 

274 # from calc_is_empty is False despite the default value for is_empty 

275 # being true. The reason for this is that strategies should be treated 

276 # as empty absent evidence to the contrary, but most basic strategies 

277 # are trivially non-empty and it would be annoying to have to override 

278 # this method to show that. 

279 return False 

280 

281 def calc_has_reusable_values(self, recur): 

282 return False 

283 

284 def example(self) -> Ex: 

285 """Provide an example of the sort of value that this strategy 

286 generates. This is biased to be slightly simpler than is typical for 

287 values from this strategy, for clarity purposes. 

288 

289 This method shouldn't be taken too seriously. It's here for interactive 

290 exploration of the API, not for any sort of real testing. 

291 

292 This method is part of the public API. 

293 """ 

294 if getattr(sys, "ps1", None) is None: # pragma: no branch 

295 # The other branch *is* covered in cover/test_examples.py; but as that 

296 # uses `pexpect` for an interactive session `coverage` doesn't see it. 

297 warnings.warn( 

298 "The `.example()` method is good for exploring strategies, but should " 

299 "only be used interactively. We recommend using `@given` for tests - " 

300 "it performs better, saves and replays failures to avoid flakiness, " 

301 "and reports minimal examples. (strategy: %r)" % (self,), 

302 NonInteractiveExampleWarning, 

303 stacklevel=2, 

304 ) 

305 

306 context = _current_build_context.value 

307 if context is not None: 

308 if context.data is not None and context.data.depth > 0: 

309 raise HypothesisException( 

310 "Using example() inside a strategy definition is a bad " 

311 "idea. Instead consider using hypothesis.strategies.builds() " 

312 "or @hypothesis.strategies.composite to define your strategy." 

313 " See https://hypothesis.readthedocs.io/en/latest/data.html" 

314 "#hypothesis.strategies.builds or " 

315 "https://hypothesis.readthedocs.io/en/latest/data.html" 

316 "#composite-strategies for more details." 

317 ) 

318 else: 

319 raise HypothesisException( 

320 "Using example() inside a test function is a bad " 

321 "idea. Instead consider using hypothesis.strategies.data() " 

322 "to draw more examples during testing. See " 

323 "https://hypothesis.readthedocs.io/en/latest/data.html" 

324 "#drawing-interactively-in-tests for more details." 

325 ) 

326 

327 try: 

328 return self.__examples.pop() 

329 except (AttributeError, IndexError): 

330 self.__examples: List[Ex] = [] 

331 

332 from hypothesis.core import given 

333 

334 # Note: this function has a weird name because it might appear in 

335 # tracebacks, and we want users to know that they can ignore it. 

336 @given(self) 

337 @settings( 

338 database=None, 

339 max_examples=100, 

340 deadline=None, 

341 verbosity=Verbosity.quiet, 

342 phases=(Phase.generate,), 

343 suppress_health_check=list(HealthCheck), 

344 ) 

345 def example_generating_inner_function(ex): 

346 self.__examples.append(ex) 

347 

348 example_generating_inner_function() 

349 shuffle(self.__examples) 

350 return self.__examples.pop() 

351 

352 def map(self, pack: Callable[[Ex], T]) -> "SearchStrategy[T]": 

353 """Returns a new strategy that generates values by generating a value 

354 from this strategy and then calling pack() on the result, giving that. 

355 

356 This method is part of the public API. 

357 """ 

358 if is_identity_function(pack): 

359 return self # type: ignore # Mypy has no way to know that `Ex == T` 

360 return MappedStrategy(self, pack=pack) 

361 

362 def flatmap( 

363 self, expand: Callable[[Ex], "SearchStrategy[T]"] 

364 ) -> "SearchStrategy[T]": 

365 """Returns a new strategy that generates values by generating a value 

366 from this strategy, say x, then generating a value from 

367 strategy(expand(x)) 

368 

369 This method is part of the public API. 

370 """ 

371 from hypothesis.strategies._internal.flatmapped import FlatMapStrategy 

372 

373 return FlatMapStrategy(expand=expand, strategy=self) 

374 

375 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]": 

376 """Returns a new strategy that generates values from this strategy 

377 which satisfy the provided condition. Note that if the condition is too 

378 hard to satisfy this might result in your tests failing with 

379 Unsatisfiable. 

380 

381 This method is part of the public API. 

382 """ 

383 return FilteredStrategy(conditions=(condition,), strategy=self) 

384 

385 def _filter_for_filtered_draw(self, condition): 

386 # Hook for parent strategies that want to perform fallible filtering 

387 # on one of their internal strategies (e.g. UniqueListStrategy). 

388 # The returned object must have a `.do_filtered_draw(data)` method 

389 # that behaves like `do_draw`, but returns the sentinel object 

390 # `filter_not_satisfied` if the condition could not be satisfied. 

391 

392 # This is separate from the main `filter` method so that strategies 

393 # can override `filter` without having to also guarantee a 

394 # `do_filtered_draw` method. 

395 return FilteredStrategy(conditions=(condition,), strategy=self) 

396 

397 @property 

398 def branches(self) -> List["SearchStrategy[Ex]"]: 

399 return [self] 

400 

401 def __or__(self, other: "SearchStrategy[T]") -> "SearchStrategy[Union[Ex, T]]": 

402 """Return a strategy which produces values by randomly drawing from one 

403 of this strategy or the other strategy. 

404 

405 This method is part of the public API. 

406 """ 

407 if not isinstance(other, SearchStrategy): 

408 raise ValueError(f"Cannot | a SearchStrategy with {other!r}") 

409 return OneOfStrategy((self, other)) 

410 

411 def __bool__(self) -> bool: 

412 warnings.warn( 

413 f"bool({self!r}) is always True, did you mean to draw a value?", 

414 HypothesisWarning, 

415 stacklevel=2, 

416 ) 

417 return True 

418 

419 def validate(self) -> None: 

420 """Throw an exception if the strategy is not valid. 

421 

422 This can happen due to lazy construction 

423 """ 

424 if self.validate_called: 

425 return 

426 try: 

427 self.validate_called = True 

428 self.do_validate() 

429 self.is_empty 

430 self.has_reusable_values 

431 except Exception: 

432 self.validate_called = False 

433 raise 

434 

435 LABELS: ClassVar[Dict[type, int]] = {} 

436 

437 @property 

438 def class_label(self): 

439 cls = self.__class__ 

440 try: 

441 return cls.LABELS[cls] 

442 except KeyError: 

443 pass 

444 result = calc_label_from_cls(cls) 

445 cls.LABELS[cls] = result 

446 return result 

447 

448 @property 

449 def label(self) -> int: 

450 if self.__label is calculating: 

451 return 0 

452 if self.__label is None: 

453 self.__label = calculating 

454 self.__label = self.calc_label() 

455 return cast(int, self.__label) 

456 

457 def calc_label(self): 

458 return self.class_label 

459 

460 def do_validate(self): 

461 pass 

462 

463 def do_draw(self, data: ConjectureData) -> Ex: 

464 raise NotImplementedError(f"{type(self).__name__}.do_draw") 

465 

466 def __init__(self): 

467 pass 

468 

469 

470def is_simple_data(value): 

471 try: 

472 hash(value) 

473 return True 

474 except TypeError: 

475 return False 

476 

477 

478class SampledFromStrategy(SearchStrategy): 

479 """A strategy which samples from a set of elements. This is essentially 

480 equivalent to using a OneOfStrategy over Just strategies but may be more 

481 efficient and convenient. 

482 """ 

483 

484 _MAX_FILTER_CALLS = 10_000 

485 

486 def __init__(self, elements, repr_=None, transformations=()): 

487 super().__init__() 

488 self.elements = cu.check_sample(elements, "sampled_from") 

489 assert self.elements 

490 self.repr_ = repr_ 

491 self._transformations = transformations 

492 

493 def map(self, pack): 

494 return type(self)( 

495 self.elements, 

496 repr_=self.repr_, 

497 transformations=(*self._transformations, ("map", pack)), 

498 ) 

499 

500 def filter(self, condition): 

501 return type(self)( 

502 self.elements, 

503 repr_=self.repr_, 

504 transformations=(*self._transformations, ("filter", condition)), 

505 ) 

506 

507 def __repr__(self): 

508 return ( 

509 self.repr_ 

510 or "sampled_from([" 

511 + ", ".join(map(get_pretty_function_description, self.elements)) 

512 + "])" 

513 ) + "".join( 

514 f".{name}({get_pretty_function_description(f)})" 

515 for name, f in self._transformations 

516 ) 

517 

518 def calc_has_reusable_values(self, recur): 

519 # Because our custom .map/.filter implementations skip the normal 

520 # wrapper strategies (which would automatically return False for us), 

521 # we need to manually return False here if any transformations have 

522 # been applied. 

523 return not self._transformations 

524 

525 def calc_is_cacheable(self, recur): 

526 return is_simple_data(self.elements) 

527 

528 def _transform(self, element): 

529 # Used in UniqueSampledListStrategy 

530 for name, f in self._transformations: 

531 if name == "map": 

532 result = f(element) 

533 if build_context := _current_build_context.value: 

534 build_context.record_call(result, f, [element], {}) 

535 element = result 

536 else: 

537 assert name == "filter" 

538 if not f(element): 

539 return filter_not_satisfied 

540 return element 

541 

542 def do_draw(self, data): 

543 result = self.do_filtered_draw(data) 

544 if isinstance(result, SearchStrategy) and all( 

545 isinstance(x, SearchStrategy) for x in self.elements 

546 ): 

547 data._sampled_from_all_strategies_elements_message = ( 

548 "sample_from was given a collection of strategies: " 

549 "{!r}. Was one_of intended?", 

550 self.elements, 

551 ) 

552 if result is filter_not_satisfied: 

553 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}") 

554 return result 

555 

556 def get_element(self, i): 

557 return self._transform(self.elements[i]) 

558 

559 def do_filtered_draw(self, data): 

560 # Set of indices that have been tried so far, so that we never test 

561 # the same element twice during a draw. 

562 known_bad_indices = set() 

563 

564 # Start with ordinary rejection sampling. It's fast if it works, and 

565 # if it doesn't work then it was only a small amount of overhead. 

566 for _ in range(3): 

567 i = data.draw_integer(0, len(self.elements) - 1) 

568 if i not in known_bad_indices: 

569 element = self.get_element(i) 

570 if element is not filter_not_satisfied: 

571 return element 

572 if not known_bad_indices: 

573 data.events[f"Retried draw from {self!r} to satisfy filter"] = "" 

574 known_bad_indices.add(i) 

575 

576 # If we've tried all the possible elements, give up now. 

577 max_good_indices = len(self.elements) - len(known_bad_indices) 

578 if not max_good_indices: 

579 return filter_not_satisfied 

580 

581 # Impose an arbitrary cutoff to prevent us from wasting too much time 

582 # on very large element lists. 

583 max_good_indices = min(max_good_indices, self._MAX_FILTER_CALLS - 3) 

584 

585 # Before building the list of allowed indices, speculatively choose 

586 # one of them. We don't yet know how many allowed indices there will be, 

587 # so this choice might be out-of-bounds, but that's OK. 

588 speculative_index = data.draw_integer(0, max_good_indices - 1) 

589 

590 # Calculate the indices of allowed values, so that we can choose one 

591 # of them at random. But if we encounter the speculatively-chosen one, 

592 # just use that and return immediately. Note that we also track the 

593 # allowed elements, in case of .map(some_stateful_function) 

594 allowed = [] 

595 for i in range(min(len(self.elements), self._MAX_FILTER_CALLS - 3)): 

596 if i not in known_bad_indices: 

597 element = self.get_element(i) 

598 if element is not filter_not_satisfied: 

599 allowed.append((i, element)) 

600 if len(allowed) > speculative_index: 

601 # Early-exit case: We reached the speculative index, so 

602 # we just return the corresponding element. 

603 data.draw_integer(0, len(self.elements) - 1, forced=i) 

604 return element 

605 

606 # The speculative index didn't work out, but at this point we've built 

607 # and can choose from the complete list of allowed indices and elements. 

608 if allowed: 

609 i, element = data.choice(allowed) 

610 data.draw_integer(0, len(self.elements) - 1, forced=i) 

611 return element 

612 # If there are no allowed indices, the filter couldn't be satisfied. 

613 return filter_not_satisfied 

614 

615 

616class OneOfStrategy(SearchStrategy[Ex]): 

617 """Implements a union of strategies. Given a number of strategies this 

618 generates values which could have come from any of them. 

619 

620 The conditional distribution draws uniformly at random from some 

621 non-empty subset of these strategies and then draws from the 

622 conditional distribution of that strategy. 

623 """ 

624 

625 def __init__(self, strategies): 

626 super().__init__() 

627 strategies = tuple(strategies) 

628 self.original_strategies = list(strategies) 

629 self.__element_strategies = None 

630 self.__in_branches = False 

631 

632 def calc_is_empty(self, recur): 

633 return all(recur(e) for e in self.original_strategies) 

634 

635 def calc_has_reusable_values(self, recur): 

636 return all(recur(e) for e in self.original_strategies) 

637 

638 def calc_is_cacheable(self, recur): 

639 return all(recur(e) for e in self.original_strategies) 

640 

641 @property 

642 def element_strategies(self): 

643 if self.__element_strategies is None: 

644 # While strategies are hashable, they use object.__hash__ and are 

645 # therefore distinguished only by identity. 

646 # 

647 # In principle we could "just" define a __hash__ method 

648 # (and __eq__, but that's easy in terms of type() and hash()) 

649 # to make this more powerful, but this is harder than it sounds: 

650 # 

651 # 1. Strategies are often distinguished by non-hashable attributes, 

652 # or by attributes that have the same hash value ("^.+" / b"^.+"). 

653 # 2. LazyStrategy: can't reify the wrapped strategy without breaking 

654 # laziness, so there's a hash each for the lazy and the nonlazy. 

655 # 

656 # Having made several attempts, the minor benefits of making strategies 

657 # hashable are simply not worth the engineering effort it would take. 

658 # See also issues #2291 and #2327. 

659 seen = {self} 

660 strategies = [] 

661 for arg in self.original_strategies: 

662 check_strategy(arg) 

663 if not arg.is_empty: 

664 for s in arg.branches: 

665 if s not in seen and not s.is_empty: 

666 seen.add(s) 

667 strategies.append(s) 

668 self.__element_strategies = strategies 

669 return self.__element_strategies 

670 

671 def calc_label(self): 

672 return combine_labels( 

673 self.class_label, *(p.label for p in self.original_strategies) 

674 ) 

675 

676 def do_draw(self, data: ConjectureData) -> Ex: 

677 strategy = data.draw( 

678 SampledFromStrategy(self.element_strategies).filter( 

679 lambda s: s.available(data) 

680 ) 

681 ) 

682 return data.draw(strategy) 

683 

684 def __repr__(self): 

685 return "one_of(%s)" % ", ".join(map(repr, self.original_strategies)) 

686 

687 def do_validate(self): 

688 for e in self.element_strategies: 

689 e.validate() 

690 

691 @property 

692 def branches(self): 

693 if not self.__in_branches: 

694 try: 

695 self.__in_branches = True 

696 return self.element_strategies 

697 finally: 

698 self.__in_branches = False 

699 else: 

700 return [self] 

701 

702 def filter(self, condition): 

703 return FilteredStrategy( 

704 OneOfStrategy([s.filter(condition) for s in self.original_strategies]), 

705 conditions=(), 

706 ) 

707 

708 

709@overload 

710def one_of( 

711 __args: Sequence[SearchStrategy[Any]], 

712) -> SearchStrategy[Any]: # pragma: no cover 

713 ... 

714 

715 

716@overload 

717def one_of(__a1: SearchStrategy[Ex]) -> SearchStrategy[Ex]: # pragma: no cover 

718 ... 

719 

720 

721@overload 

722def one_of( 

723 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T] 

724) -> SearchStrategy[Union[Ex, T]]: # pragma: no cover 

725 ... 

726 

727 

728@overload 

729def one_of( 

730 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T], __a3: SearchStrategy[T3] 

731) -> SearchStrategy[Union[Ex, T, T3]]: # pragma: no cover 

732 ... 

733 

734 

735@overload 

736def one_of( 

737 __a1: SearchStrategy[Ex], 

738 __a2: SearchStrategy[T], 

739 __a3: SearchStrategy[T3], 

740 __a4: SearchStrategy[T4], 

741) -> SearchStrategy[Union[Ex, T, T3, T4]]: # pragma: no cover 

742 ... 

743 

744 

745@overload 

746def one_of( 

747 __a1: SearchStrategy[Ex], 

748 __a2: SearchStrategy[T], 

749 __a3: SearchStrategy[T3], 

750 __a4: SearchStrategy[T4], 

751 __a5: SearchStrategy[T5], 

752) -> SearchStrategy[Union[Ex, T, T3, T4, T5]]: # pragma: no cover 

753 ... 

754 

755 

756@overload 

757def one_of(*args: SearchStrategy[Any]) -> SearchStrategy[Any]: # pragma: no cover 

758 ... 

759 

760 

761@defines_strategy(never_lazy=True) 

762def one_of( 

763 *args: Union[Sequence[SearchStrategy[Any]], SearchStrategy[Any]] 

764) -> SearchStrategy[Any]: 

765 # Mypy workaround alert: Any is too loose above; the return parameter 

766 # should be the union of the input parameters. Unfortunately, Mypy <=0.600 

767 # raises errors due to incompatible inputs instead. See #1270 for links. 

768 # v0.610 doesn't error; it gets inference wrong for 2+ arguments instead. 

769 """Return a strategy which generates values from any of the argument 

770 strategies. 

771 

772 This may be called with one iterable argument instead of multiple 

773 strategy arguments, in which case ``one_of(x)`` and ``one_of(*x)`` are 

774 equivalent. 

775 

776 Examples from this strategy will generally shrink to ones that come from 

777 strategies earlier in the list, then shrink according to behaviour of the 

778 strategy that produced them. In order to get good shrinking behaviour, 

779 try to put simpler strategies first. e.g. ``one_of(none(), text())`` is 

780 better than ``one_of(text(), none())``. 

781 

782 This is especially important when using recursive strategies. e.g. 

783 ``x = st.deferred(lambda: st.none() | st.tuples(x, x))`` will shrink well, 

784 but ``x = st.deferred(lambda: st.tuples(x, x) | st.none())`` will shrink 

785 very badly indeed. 

786 """ 

787 if len(args) == 1 and not isinstance(args[0], SearchStrategy): 

788 try: 

789 args = tuple(args[0]) 

790 except TypeError: 

791 pass 

792 if len(args) == 1 and isinstance(args[0], SearchStrategy): 

793 # This special-case means that we can one_of over lists of any size 

794 # without incurring any performance overhead when there is only one 

795 # strategy, and keeps our reprs simple. 

796 return args[0] 

797 if args and not any(isinstance(a, SearchStrategy) for a in args): 

798 # And this special case is to give a more-specific error message if it 

799 # seems that the user has confused `one_of()` for `sampled_from()`; 

800 # the remaining validation is left to OneOfStrategy. See PR #2627. 

801 raise InvalidArgument( 

802 f"Did you mean st.sampled_from({list(args)!r})? st.one_of() is used " 

803 "to combine strategies, but all of the arguments were of other types." 

804 ) 

805 return OneOfStrategy(args) 

806 

807 

808class MappedStrategy(SearchStrategy[Ex]): 

809 """A strategy which is defined purely by conversion to and from another 

810 strategy. 

811 

812 Its parameter and distribution come from that other strategy. 

813 """ 

814 

815 def __init__(self, strategy, pack): 

816 super().__init__() 

817 self.mapped_strategy = strategy 

818 self.pack = pack 

819 

820 def calc_is_empty(self, recur): 

821 return recur(self.mapped_strategy) 

822 

823 def calc_is_cacheable(self, recur): 

824 return recur(self.mapped_strategy) 

825 

826 def __repr__(self): 

827 if not hasattr(self, "_cached_repr"): 

828 self._cached_repr = f"{self.mapped_strategy!r}.map({get_pretty_function_description(self.pack)})" 

829 return self._cached_repr 

830 

831 def do_validate(self): 

832 self.mapped_strategy.validate() 

833 

834 def do_draw(self, data: ConjectureData) -> Any: 

835 with warnings.catch_warnings(): 

836 if isinstance(self.pack, type) and issubclass( 

837 self.pack, (abc.Mapping, abc.Set) 

838 ): 

839 warnings.simplefilter("ignore", BytesWarning) 

840 for _ in range(3): 

841 try: 

842 data.start_example(MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL) 

843 x = data.draw(self.mapped_strategy) 

844 result = self.pack(x) # type: ignore 

845 data.stop_example() 

846 _current_build_context.value.record_call(result, self.pack, [x], {}) 

847 return result 

848 except UnsatisfiedAssumption: 

849 data.stop_example(discard=True) 

850 raise UnsatisfiedAssumption 

851 

852 @property 

853 def branches(self) -> List[SearchStrategy[Ex]]: 

854 return [ 

855 MappedStrategy(strategy, pack=self.pack) 

856 for strategy in self.mapped_strategy.branches 

857 ] 

858 

859 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]": 

860 # Includes a special case so that we can rewrite filters on collection 

861 # lengths, when most collections are `st.lists(...).map(the_type)`. 

862 ListStrategy = _list_strategy_type() 

863 if not isinstance(self.mapped_strategy, ListStrategy) or not ( 

864 (isinstance(self.pack, type) and issubclass(self.pack, abc.Collection)) 

865 or self.pack in _collection_ish_functions() 

866 ): 

867 return super().filter(condition) 

868 

869 # Check whether our inner list strategy can rewrite this filter condition. 

870 # If not, discard the result and _only_ apply a new outer filter. 

871 new = ListStrategy.filter(self.mapped_strategy, condition) 

872 if getattr(new, "filtered_strategy", None) is self.mapped_strategy: 

873 return super().filter(condition) # didn't rewrite 

874 

875 # Apply a new outer filter even though we rewrote the inner strategy, 

876 # because some collections can change the list length (dict, set, etc). 

877 return FilteredStrategy(type(self)(new, self.pack), conditions=(condition,)) 

878 

879 

880@lru_cache 

881def _list_strategy_type(): 

882 from hypothesis.strategies._internal.collections import ListStrategy 

883 

884 return ListStrategy 

885 

886 

887def _collection_ish_functions(): 

888 funcs = [sorted] 

889 if np := sys.modules.get("numpy"): 

890 # c.f. https://numpy.org/doc/stable/reference/routines.array-creation.html 

891 # Probably only `np.array` and `np.asarray` will be used in practice, 

892 # but why should that stop us when we've already gone this far? 

893 funcs += [ 

894 np.empty_like, 

895 np.eye, 

896 np.identity, 

897 np.ones_like, 

898 np.zeros_like, 

899 np.array, 

900 np.asarray, 

901 np.asanyarray, 

902 np.ascontiguousarray, 

903 np.asmatrix, 

904 np.copy, 

905 np.rec.array, 

906 np.rec.fromarrays, 

907 np.rec.fromrecords, 

908 np.diag, 

909 # bonus undocumented functions from tab-completion: 

910 np.asarray_chkfinite, 

911 np.asfortranarray, 

912 ] 

913 

914 return funcs 

915 

916 

917filter_not_satisfied = UniqueIdentifier("filter not satisfied") 

918 

919 

920class FilteredStrategy(SearchStrategy[Ex]): 

921 def __init__(self, strategy, conditions): 

922 super().__init__() 

923 if isinstance(strategy, FilteredStrategy): 

924 # Flatten chained filters into a single filter with multiple conditions. 

925 self.flat_conditions = strategy.flat_conditions + conditions 

926 self.filtered_strategy = strategy.filtered_strategy 

927 else: 

928 self.flat_conditions = conditions 

929 self.filtered_strategy = strategy 

930 

931 assert isinstance(self.flat_conditions, tuple) 

932 assert not isinstance(self.filtered_strategy, FilteredStrategy) 

933 

934 self.__condition = None 

935 

936 def calc_is_empty(self, recur): 

937 return recur(self.filtered_strategy) 

938 

939 def calc_is_cacheable(self, recur): 

940 return recur(self.filtered_strategy) 

941 

942 def __repr__(self): 

943 if not hasattr(self, "_cached_repr"): 

944 self._cached_repr = "{!r}{}".format( 

945 self.filtered_strategy, 

946 "".join( 

947 f".filter({get_pretty_function_description(cond)})" 

948 for cond in self.flat_conditions 

949 ), 

950 ) 

951 return self._cached_repr 

952 

953 def do_validate(self): 

954 # Start by validating our inner filtered_strategy. If this was a LazyStrategy, 

955 # validation also reifies it so that subsequent calls to e.g. `.filter()` will 

956 # be passed through. 

957 self.filtered_strategy.validate() 

958 # So now we have a reified inner strategy, we'll replay all our saved 

959 # predicates in case some or all of them can be rewritten. Note that this 

960 # replaces the `fresh` strategy too! 

961 fresh = self.filtered_strategy 

962 for cond in self.flat_conditions: 

963 fresh = fresh.filter(cond) 

964 if isinstance(fresh, FilteredStrategy): 

965 # In this case we have at least some non-rewritten filter predicates, 

966 # so we just re-initialize the strategy. 

967 FilteredStrategy.__init__( 

968 self, fresh.filtered_strategy, fresh.flat_conditions 

969 ) 

970 else: 

971 # But if *all* the predicates were rewritten... well, do_validate() is 

972 # an in-place method so we still just re-initialize the strategy! 

973 FilteredStrategy.__init__(self, fresh, ()) 

974 

975 def filter(self, condition): 

976 # If we can, it's more efficient to rewrite our strategy to satisfy the 

977 # condition. We therefore exploit the fact that the order of predicates 

978 # doesn't matter (`f(x) and g(x) == g(x) and f(x)`) by attempting to apply 

979 # condition directly to our filtered strategy as the inner-most filter. 

980 out = self.filtered_strategy.filter(condition) 

981 # If it couldn't be rewritten, we'll get a new FilteredStrategy - and then 

982 # combine the conditions of each in our expected newest=last order. 

983 if isinstance(out, FilteredStrategy): 

984 return FilteredStrategy( 

985 out.filtered_strategy, self.flat_conditions + out.flat_conditions 

986 ) 

987 # But if it *could* be rewritten, we can return the more efficient form! 

988 return FilteredStrategy(out, self.flat_conditions) 

989 

990 @property 

991 def condition(self): 

992 if self.__condition is None: 

993 if len(self.flat_conditions) == 1: 

994 # Avoid an extra indirection in the common case of only one condition. 

995 self.__condition = self.flat_conditions[0] 

996 elif len(self.flat_conditions) == 0: 

997 # Possible, if unlikely, due to filter predicate rewriting 

998 self.__condition = lambda _: True 

999 else: 

1000 self.__condition = lambda x: all( 

1001 cond(x) for cond in self.flat_conditions 

1002 ) 

1003 return self.__condition 

1004 

1005 def do_draw(self, data: ConjectureData) -> Ex: 

1006 result = self.do_filtered_draw(data) 

1007 if result is not filter_not_satisfied: 

1008 return result 

1009 

1010 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}") 

1011 raise NotImplementedError("Unreachable, for Mypy") 

1012 

1013 def do_filtered_draw(self, data): 

1014 for i in range(3): 

1015 data.start_example(FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL) 

1016 value = data.draw(self.filtered_strategy) 

1017 if self.condition(value): 

1018 data.stop_example() 

1019 return value 

1020 else: 

1021 data.stop_example(discard=True) 

1022 if i == 0: 

1023 data.events[f"Retried draw from {self!r} to satisfy filter"] = "" 

1024 

1025 return filter_not_satisfied 

1026 

1027 @property 

1028 def branches(self) -> List[SearchStrategy[Ex]]: 

1029 return [ 

1030 FilteredStrategy(strategy=strategy, conditions=self.flat_conditions) 

1031 for strategy in self.filtered_strategy.branches 

1032 ] 

1033 

1034 

1035@check_function 

1036def check_strategy(arg, name=""): 

1037 assert isinstance(name, str) 

1038 if not isinstance(arg, SearchStrategy): 

1039 hint = "" 

1040 if isinstance(arg, (list, tuple)): 

1041 hint = ", such as st.sampled_from({}),".format(name or "...") 

1042 if name: 

1043 name += "=" 

1044 raise InvalidArgument( 

1045 "Expected a SearchStrategy%s but got %s%r (type=%s)" 

1046 % (hint, name, arg, type(arg).__name__) 

1047 )