Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/strategies/_internal/strategies.py: 66%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

483 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import sys 

12import warnings 

13from collections import abc, defaultdict 

14from collections.abc import Sequence 

15from functools import lru_cache 

16from random import shuffle 

17from typing import ( 

18 TYPE_CHECKING, 

19 Any, 

20 Callable, 

21 ClassVar, 

22 Generic, 

23 Literal, 

24 Optional, 

25 TypeVar, 

26 Union, 

27 cast, 

28 overload, 

29) 

30 

31from hypothesis._settings import HealthCheck, Phase, Verbosity, settings 

32from hypothesis.control import _current_build_context, current_build_context 

33from hypothesis.errors import ( 

34 HypothesisException, 

35 HypothesisWarning, 

36 InvalidArgument, 

37 NonInteractiveExampleWarning, 

38 UnsatisfiedAssumption, 

39) 

40from hypothesis.internal.conjecture import utils as cu 

41from hypothesis.internal.conjecture.data import ConjectureData 

42from hypothesis.internal.conjecture.utils import ( 

43 calc_label_from_cls, 

44 calc_label_from_hash, 

45 calc_label_from_name, 

46 combine_labels, 

47) 

48from hypothesis.internal.coverage import check_function 

49from hypothesis.internal.reflection import ( 

50 get_pretty_function_description, 

51 is_identity_function, 

52) 

53from hypothesis.strategies._internal.utils import defines_strategy 

54from hypothesis.utils.conventions import UniqueIdentifier 

55 

56if TYPE_CHECKING: 

57 from typing import TypeAlias 

58 

59 Ex = TypeVar("Ex", covariant=True, default=Any) 

60else: 

61 Ex = TypeVar("Ex", covariant=True) 

62 

63T = TypeVar("T") 

64T3 = TypeVar("T3") 

65T4 = TypeVar("T4") 

66T5 = TypeVar("T5") 

67MappedFrom = TypeVar("MappedFrom") 

68MappedTo = TypeVar("MappedTo") 

69RecurT: "TypeAlias" = Callable[["SearchStrategy"], bool] 

70calculating = UniqueIdentifier("calculating") 

71 

72MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name( 

73 "another attempted draw in MappedStrategy" 

74) 

75 

76FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name( 

77 "single loop iteration in FilteredStrategy" 

78) 

79 

80 

81def recursive_property(strategy: "SearchStrategy", name: str, default: object) -> Any: 

82 """Handle properties which may be mutually recursive among a set of 

83 strategies. 

84 

85 These are essentially lazily cached properties, with the ability to set 

86 an override: If the property has not been explicitly set, we calculate 

87 it on first access and memoize the result for later. 

88 

89 The problem is that for properties that depend on each other, a naive 

90 calculation strategy may hit infinite recursion. Consider for example 

91 the property is_empty. A strategy defined as x = st.deferred(lambda: x) 

92 is certainly empty (in order to draw a value from x we would have to 

93 draw a value from x, for which we would have to draw a value from x, 

94 ...), but in order to calculate it the naive approach would end up 

95 calling x.is_empty in order to calculate x.is_empty in order to etc. 

96 

97 The solution is one of fixed point calculation. We start with a default 

98 value that is the value of the property in the absence of evidence to 

99 the contrary, and then update the values of the property for all 

100 dependent strategies until we reach a fixed point. 

101 

102 The approach taken roughly follows that in section 4.2 of Adams, 

103 Michael D., Celeste Hollenbeck, and Matthew Might. "On the complexity 

104 and performance of parsing with derivatives." ACM SIGPLAN Notices 51.6 

105 (2016): 224-236. 

106 """ 

107 cache_key = "cached_" + name 

108 calculation = "calc_" + name 

109 force_key = "force_" + name 

110 

111 def forced_value(target: SearchStrategy) -> Any: 

112 try: 

113 return getattr(target, force_key) 

114 except AttributeError: 

115 return getattr(target, cache_key) 

116 

117 try: 

118 return forced_value(strategy) 

119 except AttributeError: 

120 pass 

121 

122 mapping: dict[SearchStrategy, Any] = {} 

123 sentinel = object() 

124 hit_recursion = False 

125 

126 # For a first pass we do a direct recursive calculation of the 

127 # property, but we block recursively visiting a value in the 

128 # computation of its property: When that happens, we simply 

129 # note that it happened and return the default value. 

130 def recur(strat: SearchStrategy) -> Any: 

131 nonlocal hit_recursion 

132 try: 

133 return forced_value(strat) 

134 except AttributeError: 

135 pass 

136 result = mapping.get(strat, sentinel) 

137 if result is calculating: 

138 hit_recursion = True 

139 return default 

140 elif result is sentinel: 

141 mapping[strat] = calculating 

142 mapping[strat] = getattr(strat, calculation)(recur) 

143 return mapping[strat] 

144 return result 

145 

146 recur(strategy) 

147 

148 # If we hit self-recursion in the computation of any strategy 

149 # value, our mapping at the end is imprecise - it may or may 

150 # not have the right values in it. We now need to proceed with 

151 # a more careful fixed point calculation to get the exact 

152 # values. Hopefully our mapping is still pretty good and it 

153 # won't take a large number of updates to reach a fixed point. 

154 if hit_recursion: 

155 needs_update = set(mapping) 

156 

157 # We track which strategies use which in the course of 

158 # calculating their property value. If A ever uses B in 

159 # the course of calculating its value, then whenever the 

160 # value of B changes we might need to update the value of 

161 # A. 

162 listeners: dict[SearchStrategy, set[SearchStrategy]] = defaultdict(set) 

163 else: 

164 needs_update = None 

165 

166 def recur2(strat: SearchStrategy) -> Any: 

167 def recur_inner(other: SearchStrategy) -> Any: 

168 try: 

169 return forced_value(other) 

170 except AttributeError: 

171 pass 

172 listeners[other].add(strat) 

173 result = mapping.get(other, sentinel) 

174 if result is sentinel: 

175 assert needs_update is not None 

176 needs_update.add(other) 

177 mapping[other] = default 

178 return default 

179 return result 

180 

181 return recur_inner 

182 

183 count = 0 

184 seen = set() 

185 while needs_update: 

186 count += 1 

187 # If we seem to be taking a really long time to stabilize we 

188 # start tracking seen values to attempt to detect an infinite 

189 # loop. This should be impossible, and most code will never 

190 # hit the count, but having an assertion for it means that 

191 # testing is easier to debug and we don't just have a hung 

192 # test. 

193 # Note: This is actually covered, by test_very_deep_deferral 

194 # in tests/cover/test_deferred_strategies.py. Unfortunately it 

195 # runs into a coverage bug. See 

196 # https://github.com/nedbat/coveragepy/issues/605 

197 # for details. 

198 if count > 50: # pragma: no cover 

199 key = frozenset(mapping.items()) 

200 assert key not in seen, (key, name) 

201 seen.add(key) 

202 to_update = needs_update 

203 needs_update = set() 

204 for strat in to_update: 

205 new_value = getattr(strat, calculation)(recur2(strat)) 

206 if new_value != mapping[strat]: 

207 needs_update.update(listeners[strat]) 

208 mapping[strat] = new_value 

209 

210 # We now have a complete and accurate calculation of the 

211 # property values for everything we have seen in the course of 

212 # running this calculation. We simultaneously update all of 

213 # them (not just the strategy we started out with). 

214 for k, v in mapping.items(): 

215 setattr(k, cache_key, v) 

216 return getattr(strategy, cache_key) 

217 

218 

219class SearchStrategy(Generic[Ex]): 

220 """A ``SearchStrategy`` tells Hypothesis how to generate that kind of input. 

221 

222 This class is only part of the public API for use in type annotations, so that 

223 you can write e.g. ``-> SearchStrategy[Foo]`` for your function which returns 

224 ``builds(Foo, ...)``. Do not inherit from or directly instantiate this class. 

225 """ 

226 

227 validate_called: bool = False 

228 __label: Union[int, UniqueIdentifier, None] = None 

229 __module__: str = "hypothesis.strategies" 

230 

231 def _available(self, data: ConjectureData) -> bool: 

232 """Returns whether this strategy can *currently* draw any 

233 values. This typically useful for stateful testing where ``Bundle`` 

234 grows over time a list of value to choose from. 

235 

236 Unlike ``empty`` property, this method's return value may change 

237 over time. 

238 Note: ``data`` parameter will only be used for introspection and no 

239 value drawn from it. 

240 """ 

241 return not self.is_empty 

242 

243 @property 

244 def is_empty(self) -> Any: 

245 # Returns True if this strategy can never draw a value and will always 

246 # result in the data being marked invalid. 

247 # The fact that this returns False does not guarantee that a valid value 

248 # can be drawn - this is not intended to be perfect, and is primarily 

249 # intended to be an optimisation for some cases. 

250 return recursive_property(self, "is_empty", True) 

251 

252 @property 

253 def supports_find(self) -> bool: 

254 return True 

255 

256 # Returns True if values from this strategy can safely be reused without 

257 # this causing unexpected behaviour. 

258 

259 # True if values from this strategy can be implicitly reused (e.g. as 

260 # background values in a numpy array) without causing surprising 

261 # user-visible behaviour. Should be false for built-in strategies that 

262 # produce mutable values, and for strategies that have been mapped/filtered 

263 # by arbitrary user-provided functions. 

264 @property 

265 def has_reusable_values(self) -> Any: 

266 return recursive_property(self, "has_reusable_values", True) 

267 

268 # Whether this strategy is suitable for holding onto in a cache. 

269 @property 

270 def is_cacheable(self) -> Any: 

271 return recursive_property(self, "is_cacheable", True) 

272 

273 def calc_is_cacheable(self, recur: RecurT) -> bool: 

274 return True 

275 

276 def calc_is_empty(self, recur: RecurT) -> bool: 

277 # Note: It is correct and significant that the default return value 

278 # from calc_is_empty is False despite the default value for is_empty 

279 # being true. The reason for this is that strategies should be treated 

280 # as empty absent evidence to the contrary, but most basic strategies 

281 # are trivially non-empty and it would be annoying to have to override 

282 # this method to show that. 

283 return False 

284 

285 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

286 return False 

287 

288 def example(self) -> Ex: # FIXME 

289 """Provide an example of the sort of value that this strategy generates. 

290 

291 This method is designed for use in a REPL, and will raise an error if 

292 called from inside |@given| or a strategy definition. For serious use, 

293 see |@composite| or |st.data|. 

294 """ 

295 if getattr(sys, "ps1", None) is None: # pragma: no branch 

296 # The other branch *is* covered in cover/test_examples.py; but as that 

297 # uses `pexpect` for an interactive session `coverage` doesn't see it. 

298 warnings.warn( 

299 "The `.example()` method is good for exploring strategies, but should " 

300 "only be used interactively. We recommend using `@given` for tests - " 

301 "it performs better, saves and replays failures to avoid flakiness, " 

302 f"and reports minimal examples. (strategy: {self!r})", 

303 NonInteractiveExampleWarning, 

304 stacklevel=2, 

305 ) 

306 

307 context = _current_build_context.value 

308 if context is not None: 

309 if context.data is not None and context.data.depth > 0: 

310 raise HypothesisException( 

311 "Using example() inside a strategy definition is a bad " 

312 "idea. Instead consider using hypothesis.strategies.builds() " 

313 "or @hypothesis.strategies.composite to define your strategy." 

314 " See https://hypothesis.readthedocs.io/en/latest/data.html" 

315 "#hypothesis.strategies.builds or " 

316 "https://hypothesis.readthedocs.io/en/latest/data.html" 

317 "#composite-strategies for more details." 

318 ) 

319 else: 

320 raise HypothesisException( 

321 "Using example() inside a test function is a bad " 

322 "idea. Instead consider using hypothesis.strategies.data() " 

323 "to draw more examples during testing. See " 

324 "https://hypothesis.readthedocs.io/en/latest/data.html" 

325 "#drawing-interactively-in-tests for more details." 

326 ) 

327 

328 try: 

329 return self.__examples.pop() 

330 except (AttributeError, IndexError): 

331 self.__examples: list[Ex] = [] 

332 

333 from hypothesis.core import given 

334 

335 # Note: this function has a weird name because it might appear in 

336 # tracebacks, and we want users to know that they can ignore it. 

337 @given(self) 

338 @settings( 

339 database=None, 

340 # generate only a few examples at a time to avoid slow interactivity 

341 # for large strategies. The overhead of @given is very small relative 

342 # to generation, so a small batch size is fine. 

343 max_examples=10, 

344 deadline=None, 

345 verbosity=Verbosity.quiet, 

346 phases=(Phase.generate,), 

347 suppress_health_check=list(HealthCheck), 

348 ) 

349 def example_generating_inner_function( 

350 ex: Ex, # type: ignore # mypy is overzealous in preventing covariant params 

351 ) -> None: 

352 self.__examples.append(ex) 

353 

354 example_generating_inner_function() 

355 shuffle(self.__examples) 

356 return self.__examples.pop() 

357 

358 def map(self, pack: Callable[[Ex], T]) -> "SearchStrategy[T]": 

359 """Returns a new strategy which generates a value from this one, and 

360 then returns ``pack(value)``. For example, ``integers().map(str)`` 

361 could generate ``str(5)`` == ``"5"``. 

362 """ 

363 if is_identity_function(pack): 

364 return self # type: ignore # Mypy has no way to know that `Ex == T` 

365 return MappedStrategy(self, pack=pack) 

366 

367 def flatmap( 

368 self, expand: Callable[[Ex], "SearchStrategy[T]"] 

369 ) -> "SearchStrategy[T]": # FIXME 

370 """Old syntax for a special case of |@composite|: 

371 

372 .. code-block:: python 

373 

374 @st.composite 

375 def flatmap_like(draw, base_strategy, expand): 

376 value = draw(base_strategy) 

377 new_strategy = expand(value) 

378 return draw(new_strategy) 

379 

380 We find that the greater readability of |@composite| usually outweighs 

381 the verbosity, with a few exceptions for simple cases or recipes like 

382 ``from_type(type).flatmap(from_type)`` ("pick a type, get a strategy for 

383 any instance of that type, and then generate one of those"). 

384 """ 

385 from hypothesis.strategies._internal.flatmapped import FlatMapStrategy 

386 

387 return FlatMapStrategy(self, expand=expand) 

388 

389 # Note that we previously had condition extracted to a type alias as 

390 # PredicateT. However, that was only useful when not specifying a relationship 

391 # between the generic Ts and some other function param / return value. 

392 # If we do want to - like here, where we want to say that the Ex arg to condition 

393 # is of the same type as the strategy's Ex - then you need to write out the 

394 # entire Callable[[Ex], Any] expression rather than use a type alias. 

395 # TypeAlias is *not* simply a macro that inserts the text. TypeAlias will not 

396 # reference the local TypeVar context. 

397 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]": 

398 """Returns a new strategy that generates values from this strategy 

399 which satisfy the provided condition. 

400 

401 Note that if the condition is too hard to satisfy this might result 

402 in your tests failing with an Unsatisfiable exception. 

403 A basic version of the filtering logic would look something like: 

404 

405 .. code-block:: python 

406 

407 @st.composite 

408 def filter_like(draw, strategy, condition): 

409 for _ in range(3): 

410 value = draw(strategy) 

411 if condition(value): 

412 return value 

413 assume(False) 

414 """ 

415 return FilteredStrategy(conditions=(condition,), strategy=self) 

416 

417 def _filter_for_filtered_draw( 

418 self, condition: Callable[[Ex], Any] 

419 ) -> "FilteredStrategy[Ex]": 

420 # Hook for parent strategies that want to perform fallible filtering 

421 # on one of their internal strategies (e.g. UniqueListStrategy). 

422 # The returned object must have a `.do_filtered_draw(data)` method 

423 # that behaves like `do_draw`, but returns the sentinel object 

424 # `filter_not_satisfied` if the condition could not be satisfied. 

425 

426 # This is separate from the main `filter` method so that strategies 

427 # can override `filter` without having to also guarantee a 

428 # `do_filtered_draw` method. 

429 return FilteredStrategy(conditions=(condition,), strategy=self) 

430 

431 @property 

432 def branches(self) -> Sequence["SearchStrategy[Ex]"]: 

433 return [self] 

434 

435 def __or__(self, other: "SearchStrategy[T]") -> "SearchStrategy[Union[Ex, T]]": 

436 """Return a strategy which produces values by randomly drawing from one 

437 of this strategy or the other strategy. 

438 

439 This method is part of the public API. 

440 """ 

441 if not isinstance(other, SearchStrategy): 

442 raise ValueError(f"Cannot | a SearchStrategy with {other!r}") 

443 

444 # Unwrap explicitly or'd strategies. This turns the 

445 # common case of e.g. st.integers() | st.integers() | st.integers() from 

446 # 

447 # one_of(one_of(integers(), integers()), integers()) 

448 # 

449 # into 

450 # 

451 # one_of(integers(), integers(), integers()) 

452 # 

453 # This is purely an aesthetic unwrapping, for e.g. reprs. In practice 

454 # we use .branches / .element_strategies to get the list of possible 

455 # strategies, so this unwrapping is *not* necessary for correctness. 

456 strategies: list[SearchStrategy] = [] 

457 strategies.extend( 

458 self.original_strategies if isinstance(self, OneOfStrategy) else [self] 

459 ) 

460 strategies.extend( 

461 other.original_strategies if isinstance(other, OneOfStrategy) else [other] 

462 ) 

463 return OneOfStrategy(strategies) 

464 

465 def __bool__(self) -> bool: 

466 warnings.warn( 

467 f"bool({self!r}) is always True, did you mean to draw a value?", 

468 HypothesisWarning, 

469 stacklevel=2, 

470 ) 

471 return True 

472 

473 def validate(self) -> None: 

474 """Throw an exception if the strategy is not valid. 

475 

476 This can happen due to lazy construction 

477 """ 

478 if self.validate_called: 

479 return 

480 try: 

481 self.validate_called = True 

482 self.do_validate() 

483 self.is_empty 

484 self.has_reusable_values 

485 except Exception: 

486 self.validate_called = False 

487 raise 

488 

489 LABELS: ClassVar[dict[type, int]] = {} 

490 

491 @property 

492 def class_label(self) -> int: 

493 cls = self.__class__ 

494 try: 

495 return cls.LABELS[cls] 

496 except KeyError: 

497 pass 

498 result = calc_label_from_cls(cls) 

499 cls.LABELS[cls] = result 

500 return result 

501 

502 @property 

503 def label(self) -> int: 

504 if self.__label is calculating: 

505 return 0 

506 if self.__label is None: 

507 self.__label = calculating 

508 self.__label = self.calc_label() 

509 return cast(int, self.__label) 

510 

511 def calc_label(self) -> int: 

512 return self.class_label 

513 

514 def do_validate(self) -> None: 

515 pass 

516 

517 def do_draw(self, data: ConjectureData) -> Ex: 

518 raise NotImplementedError(f"{type(self).__name__}.do_draw") 

519 

520 

521def is_hashable(value: object) -> bool: 

522 try: 

523 hash(value) 

524 return True 

525 except TypeError: 

526 return False 

527 

528 

529class SampledFromStrategy(SearchStrategy[Ex]): 

530 """A strategy which samples from a set of elements. This is essentially 

531 equivalent to using a OneOfStrategy over Just strategies but may be more 

532 efficient and convenient. 

533 """ 

534 

535 _MAX_FILTER_CALLS: ClassVar[int] = 10_000 

536 

537 def __init__( 

538 self, 

539 elements: Sequence[Ex], 

540 *, 

541 force_repr: Optional[str] = None, 

542 force_repr_braces: Optional[tuple[str, str]] = None, 

543 transformations: tuple[ 

544 tuple[Literal["filter", "map"], Callable[[Ex], Any]], 

545 ..., 

546 ] = (), 

547 ): 

548 super().__init__() 

549 self.elements = cu.check_sample(elements, "sampled_from") 

550 assert self.elements 

551 self.force_repr = force_repr 

552 self.force_repr_braces = force_repr_braces 

553 self._transformations = transformations 

554 

555 self._cached_repr: Optional[str] = None 

556 

557 def map(self, pack: Callable[[Ex], T]) -> SearchStrategy[T]: 

558 s = type(self)( 

559 self.elements, 

560 force_repr=self.force_repr, 

561 force_repr_braces=self.force_repr_braces, 

562 transformations=(*self._transformations, ("map", pack)), 

563 ) 

564 # guaranteed by the ("map", pack) transformation 

565 return cast(SearchStrategy[T], s) 

566 

567 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]: 

568 return type(self)( 

569 self.elements, 

570 force_repr=self.force_repr, 

571 force_repr_braces=self.force_repr_braces, 

572 transformations=(*self._transformations, ("filter", condition)), 

573 ) 

574 

575 def __repr__(self): 

576 if self._cached_repr is None: 

577 rep = get_pretty_function_description 

578 elements_s = ( 

579 ", ".join(rep(v) for v in self.elements[:512]) + ", ..." 

580 if len(self.elements) > 512 

581 else ", ".join(rep(v) for v in self.elements) 

582 ) 

583 braces = self.force_repr_braces or ("(", ")") 

584 instance_s = ( 

585 self.force_repr or f"sampled_from({braces[0]}{elements_s}{braces[1]})" 

586 ) 

587 transforms_s = "".join( 

588 f".{name}({get_pretty_function_description(f)})" 

589 for name, f in self._transformations 

590 ) 

591 repr_s = instance_s + transforms_s 

592 self._cached_repr = repr_s 

593 return self._cached_repr 

594 

595 def calc_label(self) -> int: 

596 # strategy.label is effectively an under-approximation of structural 

597 # equality (i.e., some strategies may have the same label when they are not 

598 # structurally identical). More importantly for calculating the 

599 # SampledFromStrategy label, we might have hash(s1) != hash(s2) even 

600 # when s1 and s2 are structurally identical. For instance: 

601 # 

602 # s1 = st.sampled_from([st.none()]) 

603 # s2 = st.sampled_from([st.none()]) 

604 # assert hash(s1) != hash(s2) 

605 # 

606 # (see also test cases in test_labels.py). 

607 # 

608 # We therefore use the labels of any component strategies when calculating 

609 # our label, and only use the hash if it is not a strategy. 

610 # 

611 # That's the ideal, anyway. In reality the logic is more complicated than 

612 # necessary in order to be efficient in the presence of (very) large sequences: 

613 # * add an unabashed special case for range, to avoid iteration over an 

614 # enormous range when we know it is entirely integers. 

615 # * if there is at least one strategy in self.elements, use strategy label, 

616 # and the element hash otherwise. 

617 # * if there are no strategies in self.elements, take the hash of the 

618 # entire sequence. This prevents worst-case performance of hashing each 

619 # element when a hash of the entire sequence would have sufficed. 

620 # 

621 # The worst case performance of this scheme is 

622 # itertools.chain(range(2**100), [st.none()]), where it degrades to 

623 # hashing every int in the range. 

624 

625 if isinstance(self.elements, range) or ( 

626 is_hashable(self.elements) 

627 and not any(isinstance(e, SearchStrategy) for e in self.elements) 

628 ): 

629 return combine_labels(self.class_label, calc_label_from_hash(self.elements)) 

630 

631 labels = [self.class_label] 

632 for element in self.elements: 

633 if not is_hashable(element): 

634 continue 

635 

636 labels.append( 

637 element.label 

638 if isinstance(element, SearchStrategy) 

639 else calc_label_from_hash(element) 

640 ) 

641 

642 return combine_labels(*labels) 

643 

644 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

645 # Because our custom .map/.filter implementations skip the normal 

646 # wrapper strategies (which would automatically return False for us), 

647 # we need to manually return False here if any transformations have 

648 # been applied. 

649 return not self._transformations 

650 

651 def calc_is_cacheable(self, recur: RecurT) -> bool: 

652 return is_hashable(self.elements) 

653 

654 def _transform( 

655 self, 

656 # https://github.com/python/mypy/issues/7049, we're not writing `element` 

657 # anywhere in the class so this is still type-safe. mypy is being more 

658 # conservative than necessary 

659 element: Ex, # type: ignore 

660 ) -> Union[Ex, UniqueIdentifier]: 

661 # Used in UniqueSampledListStrategy 

662 for name, f in self._transformations: 

663 if name == "map": 

664 result = f(element) 

665 if build_context := _current_build_context.value: 

666 build_context.record_call(result, f, [element], {}) 

667 element = result 

668 else: 

669 assert name == "filter" 

670 if not f(element): 

671 return filter_not_satisfied 

672 return element 

673 

674 def do_draw(self, data: ConjectureData) -> Ex: 

675 result = self.do_filtered_draw(data) 

676 if isinstance(result, SearchStrategy) and all( 

677 isinstance(x, SearchStrategy) for x in self.elements 

678 ): 

679 data._sampled_from_all_strategies_elements_message = ( 

680 "sample_from was given a collection of strategies: " 

681 "{!r}. Was one_of intended?", 

682 self.elements, 

683 ) 

684 if result is filter_not_satisfied: 

685 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}") 

686 assert not isinstance(result, UniqueIdentifier) 

687 return result 

688 

689 def get_element(self, i: int) -> Union[Ex, UniqueIdentifier]: 

690 return self._transform(self.elements[i]) 

691 

692 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]: 

693 # Set of indices that have been tried so far, so that we never test 

694 # the same element twice during a draw. 

695 known_bad_indices: set[int] = set() 

696 

697 # Start with ordinary rejection sampling. It's fast if it works, and 

698 # if it doesn't work then it was only a small amount of overhead. 

699 for _ in range(3): 

700 i = data.draw_integer(0, len(self.elements) - 1) 

701 if i not in known_bad_indices: 

702 element = self.get_element(i) 

703 if element is not filter_not_satisfied: 

704 return element 

705 if not known_bad_indices: 

706 data.events[f"Retried draw from {self!r} to satisfy filter"] = "" 

707 known_bad_indices.add(i) 

708 

709 # If we've tried all the possible elements, give up now. 

710 max_good_indices = len(self.elements) - len(known_bad_indices) 

711 if not max_good_indices: 

712 return filter_not_satisfied 

713 

714 # Impose an arbitrary cutoff to prevent us from wasting too much time 

715 # on very large element lists. 

716 max_good_indices = min(max_good_indices, self._MAX_FILTER_CALLS - 3) 

717 

718 # Before building the list of allowed indices, speculatively choose 

719 # one of them. We don't yet know how many allowed indices there will be, 

720 # so this choice might be out-of-bounds, but that's OK. 

721 speculative_index = data.draw_integer(0, max_good_indices - 1) 

722 

723 # Calculate the indices of allowed values, so that we can choose one 

724 # of them at random. But if we encounter the speculatively-chosen one, 

725 # just use that and return immediately. Note that we also track the 

726 # allowed elements, in case of .map(some_stateful_function) 

727 allowed: list[tuple[int, Ex]] = [] 

728 for i in range(min(len(self.elements), self._MAX_FILTER_CALLS - 3)): 

729 if i not in known_bad_indices: 

730 element = self.get_element(i) 

731 if element is not filter_not_satisfied: 

732 assert not isinstance(element, UniqueIdentifier) 

733 allowed.append((i, element)) 

734 if len(allowed) > speculative_index: 

735 # Early-exit case: We reached the speculative index, so 

736 # we just return the corresponding element. 

737 data.draw_integer(0, len(self.elements) - 1, forced=i) 

738 return element 

739 

740 # The speculative index didn't work out, but at this point we've built 

741 # and can choose from the complete list of allowed indices and elements. 

742 if allowed: 

743 i, element = data.choice(allowed) 

744 data.draw_integer(0, len(self.elements) - 1, forced=i) 

745 return element 

746 # If there are no allowed indices, the filter couldn't be satisfied. 

747 return filter_not_satisfied 

748 

749 

750class OneOfStrategy(SearchStrategy[Ex]): 

751 """Implements a union of strategies. Given a number of strategies this 

752 generates values which could have come from any of them. 

753 

754 The conditional distribution draws uniformly at random from some 

755 non-empty subset of these strategies and then draws from the 

756 conditional distribution of that strategy. 

757 """ 

758 

759 def __init__(self, strategies: Sequence[SearchStrategy[Ex]]): 

760 super().__init__() 

761 self.original_strategies = tuple(strategies) 

762 self.__element_strategies: Optional[Sequence[SearchStrategy[Ex]]] = None 

763 self.__in_branches = False 

764 

765 def calc_is_empty(self, recur: RecurT) -> bool: 

766 return all(recur(e) for e in self.original_strategies) 

767 

768 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

769 return all(recur(e) for e in self.original_strategies) 

770 

771 def calc_is_cacheable(self, recur: RecurT) -> bool: 

772 return all(recur(e) for e in self.original_strategies) 

773 

774 @property 

775 def element_strategies(self) -> Sequence[SearchStrategy[Ex]]: 

776 if self.__element_strategies is None: 

777 # While strategies are hashable, they use object.__hash__ and are 

778 # therefore distinguished only by identity. 

779 # 

780 # In principle we could "just" define a __hash__ method 

781 # (and __eq__, but that's easy in terms of type() and hash()) 

782 # to make this more powerful, but this is harder than it sounds: 

783 # 

784 # 1. Strategies are often distinguished by non-hashable attributes, 

785 # or by attributes that have the same hash value ("^.+" / b"^.+"). 

786 # 2. LazyStrategy: can't reify the wrapped strategy without breaking 

787 # laziness, so there's a hash each for the lazy and the nonlazy. 

788 # 

789 # Having made several attempts, the minor benefits of making strategies 

790 # hashable are simply not worth the engineering effort it would take. 

791 # See also issues #2291 and #2327. 

792 seen: set[SearchStrategy] = {self} 

793 strategies: list[SearchStrategy] = [] 

794 for arg in self.original_strategies: 

795 check_strategy(arg) 

796 if not arg.is_empty: 

797 for s in arg.branches: 

798 if s not in seen and not s.is_empty: 

799 seen.add(s) 

800 strategies.append(s) 

801 self.__element_strategies = strategies 

802 return self.__element_strategies 

803 

804 def calc_label(self) -> int: 

805 return combine_labels( 

806 self.class_label, *(p.label for p in self.original_strategies) 

807 ) 

808 

809 def do_draw(self, data: ConjectureData) -> Ex: 

810 strategy = data.draw( 

811 SampledFromStrategy(self.element_strategies).filter( 

812 lambda s: s._available(data) 

813 ) 

814 ) 

815 return data.draw(strategy) 

816 

817 def __repr__(self) -> str: 

818 return "one_of({})".format(", ".join(map(repr, self.original_strategies))) 

819 

820 def do_validate(self) -> None: 

821 for e in self.element_strategies: 

822 e.validate() 

823 

824 @property 

825 def branches(self) -> Sequence[SearchStrategy[Ex]]: 

826 if not self.__in_branches: 

827 try: 

828 self.__in_branches = True 

829 return self.element_strategies 

830 finally: 

831 self.__in_branches = False 

832 else: 

833 return [self] 

834 

835 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]: 

836 return FilteredStrategy( 

837 OneOfStrategy([s.filter(condition) for s in self.original_strategies]), 

838 conditions=(), 

839 ) 

840 

841 

842@overload 

843def one_of( 

844 __args: Sequence[SearchStrategy[Ex]], 

845) -> SearchStrategy[Ex]: # pragma: no cover 

846 ... 

847 

848 

849@overload 

850def one_of(__a1: SearchStrategy[Ex]) -> SearchStrategy[Ex]: # pragma: no cover 

851 ... 

852 

853 

854@overload 

855def one_of( 

856 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T] 

857) -> SearchStrategy[Union[Ex, T]]: # pragma: no cover 

858 ... 

859 

860 

861@overload 

862def one_of( 

863 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T], __a3: SearchStrategy[T3] 

864) -> SearchStrategy[Union[Ex, T, T3]]: # pragma: no cover 

865 ... 

866 

867 

868@overload 

869def one_of( 

870 __a1: SearchStrategy[Ex], 

871 __a2: SearchStrategy[T], 

872 __a3: SearchStrategy[T3], 

873 __a4: SearchStrategy[T4], 

874) -> SearchStrategy[Union[Ex, T, T3, T4]]: # pragma: no cover 

875 ... 

876 

877 

878@overload 

879def one_of( 

880 __a1: SearchStrategy[Ex], 

881 __a2: SearchStrategy[T], 

882 __a3: SearchStrategy[T3], 

883 __a4: SearchStrategy[T4], 

884 __a5: SearchStrategy[T5], 

885) -> SearchStrategy[Union[Ex, T, T3, T4, T5]]: # pragma: no cover 

886 ... 

887 

888 

889@overload 

890def one_of(*args: SearchStrategy[Any]) -> SearchStrategy[Any]: # pragma: no cover 

891 ... 

892 

893 

894@defines_strategy(never_lazy=True) 

895def one_of( 

896 *args: Union[Sequence[SearchStrategy[Any]], SearchStrategy[Any]] 

897) -> SearchStrategy[Any]: 

898 # Mypy workaround alert: Any is too loose above; the return parameter 

899 # should be the union of the input parameters. Unfortunately, Mypy <=0.600 

900 # raises errors due to incompatible inputs instead. See #1270 for links. 

901 # v0.610 doesn't error; it gets inference wrong for 2+ arguments instead. 

902 """Return a strategy which generates values from any of the argument 

903 strategies. 

904 

905 This may be called with one iterable argument instead of multiple 

906 strategy arguments, in which case ``one_of(x)`` and ``one_of(*x)`` are 

907 equivalent. 

908 

909 Examples from this strategy will generally shrink to ones that come from 

910 strategies earlier in the list, then shrink according to behaviour of the 

911 strategy that produced them. In order to get good shrinking behaviour, 

912 try to put simpler strategies first. e.g. ``one_of(none(), text())`` is 

913 better than ``one_of(text(), none())``. 

914 

915 This is especially important when using recursive strategies. e.g. 

916 ``x = st.deferred(lambda: st.none() | st.tuples(x, x))`` will shrink well, 

917 but ``x = st.deferred(lambda: st.tuples(x, x) | st.none())`` will shrink 

918 very badly indeed. 

919 """ 

920 if len(args) == 1 and not isinstance(args[0], SearchStrategy): 

921 try: 

922 args = tuple(args[0]) 

923 except TypeError: 

924 pass 

925 if len(args) == 1 and isinstance(args[0], SearchStrategy): 

926 # This special-case means that we can one_of over lists of any size 

927 # without incurring any performance overhead when there is only one 

928 # strategy, and keeps our reprs simple. 

929 return args[0] 

930 if args and not any(isinstance(a, SearchStrategy) for a in args): 

931 # And this special case is to give a more-specific error message if it 

932 # seems that the user has confused `one_of()` for `sampled_from()`; 

933 # the remaining validation is left to OneOfStrategy. See PR #2627. 

934 raise InvalidArgument( 

935 f"Did you mean st.sampled_from({list(args)!r})? st.one_of() is used " 

936 "to combine strategies, but all of the arguments were of other types." 

937 ) 

938 # we've handled the case where args is a one-element sequence [(s1, s2, ...)] 

939 # above, so we can assume it's an actual sequence of strategies. 

940 args = cast(Sequence[SearchStrategy], args) 

941 return OneOfStrategy(args) 

942 

943 

944class MappedStrategy(SearchStrategy[MappedTo], Generic[MappedFrom, MappedTo]): 

945 """A strategy which is defined purely by conversion to and from another 

946 strategy. 

947 

948 Its parameter and distribution come from that other strategy. 

949 """ 

950 

951 def __init__( 

952 self, 

953 strategy: SearchStrategy[MappedFrom], 

954 pack: Callable[[MappedFrom], MappedTo], 

955 ) -> None: 

956 super().__init__() 

957 self.mapped_strategy = strategy 

958 self.pack = pack 

959 

960 def calc_is_empty(self, recur: RecurT) -> bool: 

961 return recur(self.mapped_strategy) 

962 

963 def calc_is_cacheable(self, recur: RecurT) -> bool: 

964 return recur(self.mapped_strategy) 

965 

966 def __repr__(self) -> str: 

967 if not hasattr(self, "_cached_repr"): 

968 self._cached_repr = f"{self.mapped_strategy!r}.map({get_pretty_function_description(self.pack)})" 

969 return self._cached_repr 

970 

971 def do_validate(self) -> None: 

972 self.mapped_strategy.validate() 

973 

974 def do_draw(self, data: ConjectureData) -> MappedTo: 

975 with warnings.catch_warnings(): 

976 if isinstance(self.pack, type) and issubclass( 

977 self.pack, (abc.Mapping, abc.Set) 

978 ): 

979 warnings.simplefilter("ignore", BytesWarning) 

980 for _ in range(3): 

981 try: 

982 data.start_span(MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL) 

983 x = data.draw(self.mapped_strategy) 

984 result = self.pack(x) 

985 data.stop_span() 

986 current_build_context().record_call(result, self.pack, [x], {}) 

987 return result 

988 except UnsatisfiedAssumption: 

989 data.stop_span(discard=True) 

990 raise UnsatisfiedAssumption 

991 

992 @property 

993 def branches(self) -> Sequence[SearchStrategy[MappedTo]]: 

994 return [ 

995 MappedStrategy(strategy, pack=self.pack) 

996 for strategy in self.mapped_strategy.branches 

997 ] 

998 

999 def filter( 

1000 self, condition: Callable[[MappedTo], Any] 

1001 ) -> "SearchStrategy[MappedTo]": 

1002 # Includes a special case so that we can rewrite filters on collection 

1003 # lengths, when most collections are `st.lists(...).map(the_type)`. 

1004 ListStrategy = _list_strategy_type() 

1005 if not isinstance(self.mapped_strategy, ListStrategy) or not ( 

1006 (isinstance(self.pack, type) and issubclass(self.pack, abc.Collection)) 

1007 or self.pack in _collection_ish_functions() 

1008 ): 

1009 return super().filter(condition) 

1010 

1011 # Check whether our inner list strategy can rewrite this filter condition. 

1012 # If not, discard the result and _only_ apply a new outer filter. 

1013 new = ListStrategy.filter(self.mapped_strategy, condition) 

1014 if getattr(new, "filtered_strategy", None) is self.mapped_strategy: 

1015 return super().filter(condition) # didn't rewrite 

1016 

1017 # Apply a new outer filter even though we rewrote the inner strategy, 

1018 # because some collections can change the list length (dict, set, etc). 

1019 return FilteredStrategy(type(self)(new, self.pack), conditions=(condition,)) 

1020 

1021 

1022@lru_cache 

1023def _list_strategy_type() -> Any: 

1024 from hypothesis.strategies._internal.collections import ListStrategy 

1025 

1026 return ListStrategy 

1027 

1028 

1029def _collection_ish_functions() -> Sequence[Any]: 

1030 funcs = [sorted] 

1031 if np := sys.modules.get("numpy"): 

1032 # c.f. https://numpy.org/doc/stable/reference/routines.array-creation.html 

1033 # Probably only `np.array` and `np.asarray` will be used in practice, 

1034 # but why should that stop us when we've already gone this far? 

1035 funcs += [ 

1036 np.empty_like, 

1037 np.eye, 

1038 np.identity, 

1039 np.ones_like, 

1040 np.zeros_like, 

1041 np.array, 

1042 np.asarray, 

1043 np.asanyarray, 

1044 np.ascontiguousarray, 

1045 np.asmatrix, 

1046 np.copy, 

1047 np.rec.array, 

1048 np.rec.fromarrays, 

1049 np.rec.fromrecords, 

1050 np.diag, 

1051 # bonus undocumented functions from tab-completion: 

1052 np.asarray_chkfinite, 

1053 np.asfortranarray, 

1054 ] 

1055 

1056 return funcs 

1057 

1058 

1059filter_not_satisfied = UniqueIdentifier("filter not satisfied") 

1060 

1061 

1062class FilteredStrategy(SearchStrategy[Ex]): 

1063 def __init__( 

1064 self, strategy: SearchStrategy[Ex], conditions: tuple[Callable[[Ex], Any], ...] 

1065 ): 

1066 super().__init__() 

1067 if isinstance(strategy, FilteredStrategy): 

1068 # Flatten chained filters into a single filter with multiple conditions. 

1069 self.flat_conditions: tuple[Callable[[Ex], Any], ...] = ( 

1070 strategy.flat_conditions + conditions 

1071 ) 

1072 self.filtered_strategy: SearchStrategy[Ex] = strategy.filtered_strategy 

1073 else: 

1074 self.flat_conditions = conditions 

1075 self.filtered_strategy = strategy 

1076 

1077 assert isinstance(self.flat_conditions, tuple) 

1078 assert not isinstance(self.filtered_strategy, FilteredStrategy) 

1079 

1080 self.__condition: Optional[Callable[[Ex], Any]] = None 

1081 

1082 def calc_is_empty(self, recur: RecurT) -> bool: 

1083 return recur(self.filtered_strategy) 

1084 

1085 def calc_is_cacheable(self, recur: RecurT) -> bool: 

1086 return recur(self.filtered_strategy) 

1087 

1088 def __repr__(self) -> str: 

1089 if not hasattr(self, "_cached_repr"): 

1090 self._cached_repr = "{!r}{}".format( 

1091 self.filtered_strategy, 

1092 "".join( 

1093 f".filter({get_pretty_function_description(cond)})" 

1094 for cond in self.flat_conditions 

1095 ), 

1096 ) 

1097 return self._cached_repr 

1098 

1099 def do_validate(self) -> None: 

1100 # Start by validating our inner filtered_strategy. If this was a LazyStrategy, 

1101 # validation also reifies it so that subsequent calls to e.g. `.filter()` will 

1102 # be passed through. 

1103 self.filtered_strategy.validate() 

1104 # So now we have a reified inner strategy, we'll replay all our saved 

1105 # predicates in case some or all of them can be rewritten. Note that this 

1106 # replaces the `fresh` strategy too! 

1107 fresh = self.filtered_strategy 

1108 for cond in self.flat_conditions: 

1109 fresh = fresh.filter(cond) 

1110 if isinstance(fresh, FilteredStrategy): 

1111 # In this case we have at least some non-rewritten filter predicates, 

1112 # so we just re-initialize the strategy. 

1113 FilteredStrategy.__init__( 

1114 self, fresh.filtered_strategy, fresh.flat_conditions 

1115 ) 

1116 else: 

1117 # But if *all* the predicates were rewritten... well, do_validate() is 

1118 # an in-place method so we still just re-initialize the strategy! 

1119 FilteredStrategy.__init__(self, fresh, ()) 

1120 

1121 def filter(self, condition: Callable[[Ex], Any]) -> "FilteredStrategy[Ex]": 

1122 # If we can, it's more efficient to rewrite our strategy to satisfy the 

1123 # condition. We therefore exploit the fact that the order of predicates 

1124 # doesn't matter (`f(x) and g(x) == g(x) and f(x)`) by attempting to apply 

1125 # condition directly to our filtered strategy as the inner-most filter. 

1126 out = self.filtered_strategy.filter(condition) 

1127 # If it couldn't be rewritten, we'll get a new FilteredStrategy - and then 

1128 # combine the conditions of each in our expected newest=last order. 

1129 if isinstance(out, FilteredStrategy): 

1130 return FilteredStrategy( 

1131 out.filtered_strategy, self.flat_conditions + out.flat_conditions 

1132 ) 

1133 # But if it *could* be rewritten, we can return the more efficient form! 

1134 return FilteredStrategy(out, self.flat_conditions) 

1135 

1136 @property 

1137 def condition(self) -> Callable[[Ex], Any]: 

1138 if self.__condition is None: 

1139 if len(self.flat_conditions) == 1: 

1140 # Avoid an extra indirection in the common case of only one condition. 

1141 self.__condition = self.flat_conditions[0] 

1142 elif len(self.flat_conditions) == 0: 

1143 # Possible, if unlikely, due to filter predicate rewriting 

1144 self.__condition = lambda _: True # type: ignore # covariant type param 

1145 else: 

1146 self.__condition = lambda x: all( # type: ignore # covariant type param 

1147 cond(x) for cond in self.flat_conditions 

1148 ) 

1149 return self.__condition 

1150 

1151 def do_draw(self, data: ConjectureData) -> Ex: 

1152 result = self.do_filtered_draw(data) 

1153 if result is not filter_not_satisfied: 

1154 return cast(Ex, result) 

1155 

1156 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}") 

1157 

1158 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]: 

1159 for i in range(3): 

1160 data.start_span(FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL) 

1161 value = data.draw(self.filtered_strategy) 

1162 if self.condition(value): 

1163 data.stop_span() 

1164 return value 

1165 else: 

1166 data.stop_span(discard=True) 

1167 if i == 0: 

1168 data.events[f"Retried draw from {self!r} to satisfy filter"] = "" 

1169 

1170 return filter_not_satisfied 

1171 

1172 @property 

1173 def branches(self) -> Sequence[SearchStrategy[Ex]]: 

1174 return [ 

1175 FilteredStrategy(strategy=strategy, conditions=self.flat_conditions) 

1176 for strategy in self.filtered_strategy.branches 

1177 ] 

1178 

1179 

1180@check_function 

1181def check_strategy(arg: object, name: str = "") -> None: 

1182 assert isinstance(name, str) 

1183 if not isinstance(arg, SearchStrategy): 

1184 hint = "" 

1185 if isinstance(arg, (list, tuple)): 

1186 hint = ", such as st.sampled_from({}),".format(name or "...") 

1187 if name: 

1188 name += "=" 

1189 raise InvalidArgument( 

1190 f"Expected a SearchStrategy{hint} but got {name}{arg!r} " 

1191 f"(type={type(arg).__name__})" 

1192 )