Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/strategies/_internal/strategies.py: 67%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

494 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import sys 

12import threading 

13import warnings 

14from collections import abc, defaultdict 

15from collections.abc import Sequence 

16from functools import lru_cache 

17from random import shuffle 

18from threading import RLock 

19from typing import ( 

20 TYPE_CHECKING, 

21 Any, 

22 Callable, 

23 ClassVar, 

24 Generic, 

25 Literal, 

26 Optional, 

27 TypeVar, 

28 Union, 

29 cast, 

30 overload, 

31) 

32 

33from hypothesis._settings import HealthCheck, Phase, Verbosity, settings 

34from hypothesis.control import _current_build_context, current_build_context 

35from hypothesis.errors import ( 

36 HypothesisException, 

37 HypothesisWarning, 

38 InvalidArgument, 

39 NonInteractiveExampleWarning, 

40 UnsatisfiedAssumption, 

41) 

42from hypothesis.internal.conjecture import utils as cu 

43from hypothesis.internal.conjecture.data import ConjectureData 

44from hypothesis.internal.conjecture.utils import ( 

45 calc_label_from_cls, 

46 calc_label_from_hash, 

47 calc_label_from_name, 

48 combine_labels, 

49) 

50from hypothesis.internal.coverage import check_function 

51from hypothesis.internal.reflection import ( 

52 get_pretty_function_description, 

53 is_identity_function, 

54) 

55from hypothesis.strategies._internal.utils import defines_strategy 

56from hypothesis.utils.conventions import UniqueIdentifier 

57 

58if TYPE_CHECKING: 

59 from typing import TypeAlias 

60 

61 Ex = TypeVar("Ex", covariant=True, default=Any) 

62else: 

63 Ex = TypeVar("Ex", covariant=True) 

64 

65T = TypeVar("T") 

66T3 = TypeVar("T3") 

67T4 = TypeVar("T4") 

68T5 = TypeVar("T5") 

69MappedFrom = TypeVar("MappedFrom") 

70MappedTo = TypeVar("MappedTo") 

71RecurT: "TypeAlias" = Callable[["SearchStrategy"], bool] 

72calculating = UniqueIdentifier("calculating") 

73 

74MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name( 

75 "another attempted draw in MappedStrategy" 

76) 

77 

78FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name( 

79 "single loop iteration in FilteredStrategy" 

80) 

81 

82label_lock = RLock() 

83 

84 

85def recursive_property(strategy: "SearchStrategy", name: str, default: object) -> Any: 

86 """Handle properties which may be mutually recursive among a set of 

87 strategies. 

88 

89 These are essentially lazily cached properties, with the ability to set 

90 an override: If the property has not been explicitly set, we calculate 

91 it on first access and memoize the result for later. 

92 

93 The problem is that for properties that depend on each other, a naive 

94 calculation strategy may hit infinite recursion. Consider for example 

95 the property is_empty. A strategy defined as x = st.deferred(lambda: x) 

96 is certainly empty (in order to draw a value from x we would have to 

97 draw a value from x, for which we would have to draw a value from x, 

98 ...), but in order to calculate it the naive approach would end up 

99 calling x.is_empty in order to calculate x.is_empty in order to etc. 

100 

101 The solution is one of fixed point calculation. We start with a default 

102 value that is the value of the property in the absence of evidence to 

103 the contrary, and then update the values of the property for all 

104 dependent strategies until we reach a fixed point. 

105 

106 The approach taken roughly follows that in section 4.2 of Adams, 

107 Michael D., Celeste Hollenbeck, and Matthew Might. "On the complexity 

108 and performance of parsing with derivatives." ACM SIGPLAN Notices 51.6 

109 (2016): 224-236. 

110 """ 

111 assert name in {"is_empty", "has_reusable_values", "is_cacheable"} 

112 cache_key = "cached_" + name 

113 calculation = "calc_" + name 

114 force_key = "force_" + name 

115 

116 def forced_value(target: SearchStrategy) -> Any: 

117 try: 

118 return getattr(target, force_key) 

119 except AttributeError: 

120 return getattr(target, cache_key) 

121 

122 try: 

123 return forced_value(strategy) 

124 except AttributeError: 

125 pass 

126 

127 mapping: dict[SearchStrategy, Any] = {} 

128 sentinel = object() 

129 hit_recursion = False 

130 

131 # For a first pass we do a direct recursive calculation of the 

132 # property, but we block recursively visiting a value in the 

133 # computation of its property: When that happens, we simply 

134 # note that it happened and return the default value. 

135 def recur(strat: SearchStrategy) -> Any: 

136 nonlocal hit_recursion 

137 try: 

138 return forced_value(strat) 

139 except AttributeError: 

140 pass 

141 result = mapping.get(strat, sentinel) 

142 if result is calculating: 

143 hit_recursion = True 

144 return default 

145 elif result is sentinel: 

146 mapping[strat] = calculating 

147 mapping[strat] = getattr(strat, calculation)(recur) 

148 return mapping[strat] 

149 return result 

150 

151 recur(strategy) 

152 

153 # If we hit self-recursion in the computation of any strategy 

154 # value, our mapping at the end is imprecise - it may or may 

155 # not have the right values in it. We now need to proceed with 

156 # a more careful fixed point calculation to get the exact 

157 # values. Hopefully our mapping is still pretty good and it 

158 # won't take a large number of updates to reach a fixed point. 

159 if hit_recursion: 

160 needs_update = set(mapping) 

161 

162 # We track which strategies use which in the course of 

163 # calculating their property value. If A ever uses B in 

164 # the course of calculating its value, then whenever the 

165 # value of B changes we might need to update the value of 

166 # A. 

167 listeners: dict[SearchStrategy, set[SearchStrategy]] = defaultdict(set) 

168 else: 

169 needs_update = None 

170 

171 def recur2(strat: SearchStrategy) -> Any: 

172 def recur_inner(other: SearchStrategy) -> Any: 

173 try: 

174 return forced_value(other) 

175 except AttributeError: 

176 pass 

177 listeners[other].add(strat) 

178 result = mapping.get(other, sentinel) 

179 if result is sentinel: 

180 assert needs_update is not None 

181 needs_update.add(other) 

182 mapping[other] = default 

183 return default 

184 return result 

185 

186 return recur_inner 

187 

188 count = 0 

189 seen = set() 

190 while needs_update: 

191 count += 1 

192 # If we seem to be taking a really long time to stabilize we 

193 # start tracking seen values to attempt to detect an infinite 

194 # loop. This should be impossible, and most code will never 

195 # hit the count, but having an assertion for it means that 

196 # testing is easier to debug and we don't just have a hung 

197 # test. 

198 # Note: This is actually covered, by test_very_deep_deferral 

199 # in tests/cover/test_deferred_strategies.py. Unfortunately it 

200 # runs into a coverage bug. See 

201 # https://github.com/nedbat/coveragepy/issues/605 

202 # for details. 

203 if count > 50: # pragma: no cover 

204 key = frozenset(mapping.items()) 

205 assert key not in seen, (key, name) 

206 seen.add(key) 

207 to_update = needs_update 

208 needs_update = set() 

209 for strat in to_update: 

210 new_value = getattr(strat, calculation)(recur2(strat)) 

211 if new_value != mapping[strat]: 

212 needs_update.update(listeners[strat]) 

213 mapping[strat] = new_value 

214 

215 # We now have a complete and accurate calculation of the 

216 # property values for everything we have seen in the course of 

217 # running this calculation. We simultaneously update all of 

218 # them (not just the strategy we started out with). 

219 for k, v in mapping.items(): 

220 setattr(k, cache_key, v) 

221 return getattr(strategy, cache_key) 

222 

223 

224class SearchStrategy(Generic[Ex]): 

225 """A ``SearchStrategy`` tells Hypothesis how to generate that kind of input. 

226 

227 This class is only part of the public API for use in type annotations, so that 

228 you can write e.g. ``-> SearchStrategy[Foo]`` for your function which returns 

229 ``builds(Foo, ...)``. Do not inherit from or directly instantiate this class. 

230 """ 

231 

232 __module__: str = "hypothesis.strategies" 

233 LABELS: ClassVar[dict[type, int]] = {} 

234 # triggers `assert isinstance(label, int)` under threading when setting this 

235 # in init instead of a classvar. I'm not sure why, init should be safe. But 

236 # this works so I'm not looking into it further atm. 

237 __label: Union[int, UniqueIdentifier, None] = None 

238 

239 def __init__(self): 

240 self.validate_called: dict[int, bool] = {} 

241 

242 def is_currently_empty(self, data: ConjectureData) -> bool: 

243 """ 

244 Returns whether this strategy is currently empty. Unlike ``empty``, 

245 which is computed based on static information and cannot change, 

246 ``is_currently_empty`` may change over time based on choices made 

247 during the test case. 

248 

249 This is currently only used for stateful testing, where |Bundle| grows a 

250 list of values to choose from over the course of a test case. 

251 

252 ``data`` will only be used for introspection. No values will be drawn 

253 from it in a way that modifies the choice sequence. 

254 """ 

255 return self.is_empty 

256 

257 @property 

258 def is_empty(self) -> Any: 

259 # Returns True if this strategy can never draw a value and will always 

260 # result in the data being marked invalid. 

261 # The fact that this returns False does not guarantee that a valid value 

262 # can be drawn - this is not intended to be perfect, and is primarily 

263 # intended to be an optimisation for some cases. 

264 return recursive_property(self, "is_empty", True) 

265 

266 # Returns True if values from this strategy can safely be reused without 

267 # this causing unexpected behaviour. 

268 

269 # True if values from this strategy can be implicitly reused (e.g. as 

270 # background values in a numpy array) without causing surprising 

271 # user-visible behaviour. Should be false for built-in strategies that 

272 # produce mutable values, and for strategies that have been mapped/filtered 

273 # by arbitrary user-provided functions. 

274 @property 

275 def has_reusable_values(self) -> Any: 

276 return recursive_property(self, "has_reusable_values", True) 

277 

278 # Whether this strategy is suitable for holding onto in a cache. 

279 @property 

280 def is_cacheable(self) -> Any: 

281 return recursive_property(self, "is_cacheable", True) 

282 

283 def calc_is_cacheable(self, recur: RecurT) -> bool: 

284 return True 

285 

286 def calc_is_empty(self, recur: RecurT) -> bool: 

287 # Note: It is correct and significant that the default return value 

288 # from calc_is_empty is False despite the default value for is_empty 

289 # being true. The reason for this is that strategies should be treated 

290 # as empty absent evidence to the contrary, but most basic strategies 

291 # are trivially non-empty and it would be annoying to have to override 

292 # this method to show that. 

293 return False 

294 

295 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

296 return False 

297 

298 def example(self) -> Ex: # FIXME 

299 """Provide an example of the sort of value that this strategy generates. 

300 

301 This method is designed for use in a REPL, and will raise an error if 

302 called from inside |@given| or a strategy definition. For serious use, 

303 see |@composite| or |st.data|. 

304 """ 

305 if getattr(sys, "ps1", None) is None: # pragma: no branch 

306 # The other branch *is* covered in cover/test_examples.py; but as that 

307 # uses `pexpect` for an interactive session `coverage` doesn't see it. 

308 warnings.warn( 

309 "The `.example()` method is good for exploring strategies, but should " 

310 "only be used interactively. We recommend using `@given` for tests - " 

311 "it performs better, saves and replays failures to avoid flakiness, " 

312 f"and reports minimal examples. (strategy: {self!r})", 

313 NonInteractiveExampleWarning, 

314 stacklevel=2, 

315 ) 

316 

317 context = _current_build_context.value 

318 if context is not None: 

319 if context.data is not None and context.data.depth > 0: 

320 raise HypothesisException( 

321 "Using example() inside a strategy definition is a bad " 

322 "idea. Instead consider using hypothesis.strategies.builds() " 

323 "or @hypothesis.strategies.composite to define your strategy." 

324 " See https://hypothesis.readthedocs.io/en/latest/data.html" 

325 "#hypothesis.strategies.builds or " 

326 "https://hypothesis.readthedocs.io/en/latest/data.html" 

327 "#composite-strategies for more details." 

328 ) 

329 else: 

330 raise HypothesisException( 

331 "Using example() inside a test function is a bad " 

332 "idea. Instead consider using hypothesis.strategies.data() " 

333 "to draw more examples during testing. See " 

334 "https://hypothesis.readthedocs.io/en/latest/data.html" 

335 "#drawing-interactively-in-tests for more details." 

336 ) 

337 

338 try: 

339 return self.__examples.pop() 

340 except (AttributeError, IndexError): 

341 self.__examples: list[Ex] = [] 

342 

343 from hypothesis.core import given 

344 

345 # Note: this function has a weird name because it might appear in 

346 # tracebacks, and we want users to know that they can ignore it. 

347 @given(self) 

348 @settings( 

349 database=None, 

350 # generate only a few examples at a time to avoid slow interactivity 

351 # for large strategies. The overhead of @given is very small relative 

352 # to generation, so a small batch size is fine. 

353 max_examples=10, 

354 deadline=None, 

355 verbosity=Verbosity.quiet, 

356 phases=(Phase.generate,), 

357 suppress_health_check=list(HealthCheck), 

358 ) 

359 def example_generating_inner_function( 

360 ex: Ex, # type: ignore # mypy is overzealous in preventing covariant params 

361 ) -> None: 

362 self.__examples.append(ex) 

363 

364 example_generating_inner_function() 

365 shuffle(self.__examples) 

366 return self.__examples.pop() 

367 

368 def map(self, pack: Callable[[Ex], T]) -> "SearchStrategy[T]": 

369 """Returns a new strategy which generates a value from this one, and 

370 then returns ``pack(value)``. For example, ``integers().map(str)`` 

371 could generate ``str(5)`` == ``"5"``. 

372 """ 

373 if is_identity_function(pack): 

374 return self # type: ignore # Mypy has no way to know that `Ex == T` 

375 return MappedStrategy(self, pack=pack) 

376 

377 def flatmap( 

378 self, expand: Callable[[Ex], "SearchStrategy[T]"] 

379 ) -> "SearchStrategy[T]": # FIXME 

380 """Old syntax for a special case of |@composite|: 

381 

382 .. code-block:: python 

383 

384 @st.composite 

385 def flatmap_like(draw, base_strategy, expand): 

386 value = draw(base_strategy) 

387 new_strategy = expand(value) 

388 return draw(new_strategy) 

389 

390 We find that the greater readability of |@composite| usually outweighs 

391 the verbosity, with a few exceptions for simple cases or recipes like 

392 ``from_type(type).flatmap(from_type)`` ("pick a type, get a strategy for 

393 any instance of that type, and then generate one of those"). 

394 """ 

395 from hypothesis.strategies._internal.flatmapped import FlatMapStrategy 

396 

397 return FlatMapStrategy(self, expand=expand) 

398 

399 # Note that we previously had condition extracted to a type alias as 

400 # PredicateT. However, that was only useful when not specifying a relationship 

401 # between the generic Ts and some other function param / return value. 

402 # If we do want to - like here, where we want to say that the Ex arg to condition 

403 # is of the same type as the strategy's Ex - then you need to write out the 

404 # entire Callable[[Ex], Any] expression rather than use a type alias. 

405 # TypeAlias is *not* simply a macro that inserts the text. TypeAlias will not 

406 # reference the local TypeVar context. 

407 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]": 

408 """Returns a new strategy that generates values from this strategy 

409 which satisfy the provided condition. 

410 

411 Note that if the condition is too hard to satisfy this might result 

412 in your tests failing with an Unsatisfiable exception. 

413 A basic version of the filtering logic would look something like: 

414 

415 .. code-block:: python 

416 

417 @st.composite 

418 def filter_like(draw, strategy, condition): 

419 for _ in range(3): 

420 value = draw(strategy) 

421 if condition(value): 

422 return value 

423 assume(False) 

424 """ 

425 return FilteredStrategy(self, conditions=(condition,)) 

426 

427 @property 

428 def branches(self) -> Sequence["SearchStrategy[Ex]"]: 

429 return [self] 

430 

431 def __or__(self, other: "SearchStrategy[T]") -> "SearchStrategy[Union[Ex, T]]": 

432 """Return a strategy which produces values by randomly drawing from one 

433 of this strategy or the other strategy. 

434 

435 This method is part of the public API. 

436 """ 

437 if not isinstance(other, SearchStrategy): 

438 raise ValueError(f"Cannot | a SearchStrategy with {other!r}") 

439 

440 # Unwrap explicitly or'd strategies. This turns the 

441 # common case of e.g. st.integers() | st.integers() | st.integers() from 

442 # 

443 # one_of(one_of(integers(), integers()), integers()) 

444 # 

445 # into 

446 # 

447 # one_of(integers(), integers(), integers()) 

448 # 

449 # This is purely an aesthetic unwrapping, for e.g. reprs. In practice 

450 # we use .branches / .element_strategies to get the list of possible 

451 # strategies, so this unwrapping is *not* necessary for correctness. 

452 strategies: list[SearchStrategy] = [] 

453 strategies.extend( 

454 self.original_strategies if isinstance(self, OneOfStrategy) else [self] 

455 ) 

456 strategies.extend( 

457 other.original_strategies if isinstance(other, OneOfStrategy) else [other] 

458 ) 

459 return OneOfStrategy(strategies) 

460 

461 def __bool__(self) -> bool: 

462 warnings.warn( 

463 f"bool({self!r}) is always True, did you mean to draw a value?", 

464 HypothesisWarning, 

465 stacklevel=2, 

466 ) 

467 return True 

468 

469 def validate(self) -> None: 

470 """Throw an exception if the strategy is not valid. 

471 

472 Strategies should implement ``do_validate``, which is called by this 

473 method. They should not override ``validate``. 

474 

475 This can happen due to invalid arguments, or lazy construction. 

476 """ 

477 thread_id = threading.get_ident() 

478 if self.validate_called.get(thread_id, False): 

479 return 

480 # we need to set validate_called before calling do_validate, for 

481 # recursive / deferred strategies. But if a thread switches after 

482 # validate_called but before do_validate, we might have a strategy 

483 # which does weird things like drawing when do_validate would error but 

484 # its params are technically valid (e.g. a param was passed as 1.0 

485 # instead of 1) and get into weird internal states. 

486 # 

487 # There are two ways to fix this. 

488 # (1) The first is a per-strategy lock around do_validate. Even though we 

489 # expect near-zero lock contention, this still adds the lock overhead. 

490 # (2) The second is allowing concurrent .validate calls. Since validation 

491 # is (assumed to be) deterministic, both threads will produce the same 

492 # end state, so the validation order or race conditions does not matter. 

493 # 

494 # In order to avoid the lock overhead of (1), we use (2) here. See also 

495 # discussion in https://github.com/HypothesisWorks/hypothesis/pull/4473. 

496 try: 

497 self.validate_called[thread_id] = True 

498 self.do_validate() 

499 self.is_empty 

500 self.has_reusable_values 

501 except Exception: 

502 self.validate_called[thread_id] = False 

503 raise 

504 

505 @property 

506 def class_label(self) -> int: 

507 cls = self.__class__ 

508 try: 

509 return cls.LABELS[cls] 

510 except KeyError: 

511 pass 

512 result = calc_label_from_cls(cls) 

513 cls.LABELS[cls] = result 

514 return result 

515 

516 @property 

517 def label(self) -> int: 

518 if isinstance((label := self.__label), int): 

519 # avoid locking if we've already completely computed the label. 

520 return label 

521 

522 with label_lock: 

523 if self.__label is calculating: 

524 return 0 

525 self.__label = calculating 

526 self.__label = self.calc_label() 

527 return self.__label 

528 

529 def calc_label(self) -> int: 

530 return self.class_label 

531 

532 def do_validate(self) -> None: 

533 pass 

534 

535 def do_draw(self, data: ConjectureData) -> Ex: 

536 raise NotImplementedError(f"{type(self).__name__}.do_draw") 

537 

538 

539def _is_hashable(value: object) -> tuple[bool, Optional[int]]: 

540 # hashing can be expensive; return the hash value if we compute it, so that 

541 # callers don't have to recompute. 

542 try: 

543 return (True, hash(value)) 

544 except TypeError: 

545 return (False, None) 

546 

547 

548def is_hashable(value: object) -> bool: 

549 return _is_hashable(value)[0] 

550 

551 

552class SampledFromStrategy(SearchStrategy[Ex]): 

553 """A strategy which samples from a set of elements. This is essentially 

554 equivalent to using a OneOfStrategy over Just strategies but may be more 

555 efficient and convenient. 

556 """ 

557 

558 _MAX_FILTER_CALLS: ClassVar[int] = 10_000 

559 

560 def __init__( 

561 self, 

562 elements: Sequence[Ex], 

563 *, 

564 force_repr: Optional[str] = None, 

565 force_repr_braces: Optional[tuple[str, str]] = None, 

566 transformations: tuple[ 

567 tuple[Literal["filter", "map"], Callable[[Ex], Any]], 

568 ..., 

569 ] = (), 

570 ): 

571 super().__init__() 

572 self.elements = cu.check_sample(elements, "sampled_from") 

573 assert self.elements 

574 self.force_repr = force_repr 

575 self.force_repr_braces = force_repr_braces 

576 self._transformations = transformations 

577 

578 self._cached_repr: Optional[str] = None 

579 

580 def map(self, pack: Callable[[Ex], T]) -> SearchStrategy[T]: 

581 s = type(self)( 

582 self.elements, 

583 force_repr=self.force_repr, 

584 force_repr_braces=self.force_repr_braces, 

585 transformations=(*self._transformations, ("map", pack)), 

586 ) 

587 # guaranteed by the ("map", pack) transformation 

588 return cast(SearchStrategy[T], s) 

589 

590 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]: 

591 return type(self)( 

592 self.elements, 

593 force_repr=self.force_repr, 

594 force_repr_braces=self.force_repr_braces, 

595 transformations=(*self._transformations, ("filter", condition)), 

596 ) 

597 

598 def __repr__(self): 

599 if self._cached_repr is None: 

600 rep = get_pretty_function_description 

601 elements_s = ( 

602 ", ".join(rep(v) for v in self.elements[:512]) + ", ..." 

603 if len(self.elements) > 512 

604 else ", ".join(rep(v) for v in self.elements) 

605 ) 

606 braces = self.force_repr_braces or ("(", ")") 

607 instance_s = ( 

608 self.force_repr or f"sampled_from({braces[0]}{elements_s}{braces[1]})" 

609 ) 

610 transforms_s = "".join( 

611 f".{name}({get_pretty_function_description(f)})" 

612 for name, f in self._transformations 

613 ) 

614 repr_s = instance_s + transforms_s 

615 self._cached_repr = repr_s 

616 return self._cached_repr 

617 

618 def calc_label(self) -> int: 

619 # strategy.label is effectively an under-approximation of structural 

620 # equality (i.e., some strategies may have the same label when they are not 

621 # structurally identical). More importantly for calculating the 

622 # SampledFromStrategy label, we might have hash(s1) != hash(s2) even 

623 # when s1 and s2 are structurally identical. For instance: 

624 # 

625 # s1 = st.sampled_from([st.none()]) 

626 # s2 = st.sampled_from([st.none()]) 

627 # assert hash(s1) != hash(s2) 

628 # 

629 # (see also test cases in test_labels.py). 

630 # 

631 # We therefore use the labels of any component strategies when calculating 

632 # our label, and only use the hash if it is not a strategy. 

633 # 

634 # That's the ideal, anyway. In reality the logic is more complicated than 

635 # necessary in order to be efficient in the presence of (very) large sequences: 

636 # * add an unabashed special case for range, to avoid iteration over an 

637 # enormous range when we know it is entirely integers. 

638 # * if there is at least one strategy in self.elements, use strategy label, 

639 # and the element hash otherwise. 

640 # * if there are no strategies in self.elements, take the hash of the 

641 # entire sequence. This prevents worst-case performance of hashing each 

642 # element when a hash of the entire sequence would have sufficed. 

643 # 

644 # The worst case performance of this scheme is 

645 # itertools.chain(range(2**100), [st.none()]), where it degrades to 

646 # hashing every int in the range. 

647 (elements_is_hashable, hash_value) = _is_hashable(self.elements) 

648 if isinstance(self.elements, range) or ( 

649 elements_is_hashable 

650 and not any(isinstance(e, SearchStrategy) for e in self.elements) 

651 ): 

652 return combine_labels( 

653 self.class_label, calc_label_from_name(str(hash_value)) 

654 ) 

655 

656 labels = [self.class_label] 

657 for element in self.elements: 

658 if not is_hashable(element): 

659 continue 

660 

661 labels.append( 

662 element.label 

663 if isinstance(element, SearchStrategy) 

664 else calc_label_from_hash(element) 

665 ) 

666 

667 return combine_labels(*labels) 

668 

669 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

670 # Because our custom .map/.filter implementations skip the normal 

671 # wrapper strategies (which would automatically return False for us), 

672 # we need to manually return False here if any transformations have 

673 # been applied. 

674 return not self._transformations 

675 

676 def calc_is_cacheable(self, recur: RecurT) -> bool: 

677 return is_hashable(self.elements) 

678 

679 def _transform( 

680 self, 

681 # https://github.com/python/mypy/issues/7049, we're not writing `element` 

682 # anywhere in the class so this is still type-safe. mypy is being more 

683 # conservative than necessary 

684 element: Ex, # type: ignore 

685 ) -> Union[Ex, UniqueIdentifier]: 

686 # Used in UniqueSampledListStrategy 

687 for name, f in self._transformations: 

688 if name == "map": 

689 result = f(element) 

690 if build_context := _current_build_context.value: 

691 build_context.record_call(result, f, args=[element], kwargs={}) 

692 element = result 

693 else: 

694 assert name == "filter" 

695 if not f(element): 

696 return filter_not_satisfied 

697 return element 

698 

699 def do_draw(self, data: ConjectureData) -> Ex: 

700 result = self.do_filtered_draw(data) 

701 if isinstance(result, SearchStrategy) and all( 

702 isinstance(x, SearchStrategy) for x in self.elements 

703 ): 

704 data._sampled_from_all_strategies_elements_message = ( 

705 "sample_from was given a collection of strategies: " 

706 "{!r}. Was one_of intended?", 

707 self.elements, 

708 ) 

709 if result is filter_not_satisfied: 

710 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}") 

711 assert not isinstance(result, UniqueIdentifier) 

712 return result 

713 

714 def get_element(self, i: int) -> Union[Ex, UniqueIdentifier]: 

715 return self._transform(self.elements[i]) 

716 

717 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]: 

718 # Set of indices that have been tried so far, so that we never test 

719 # the same element twice during a draw. 

720 known_bad_indices: set[int] = set() 

721 

722 # Start with ordinary rejection sampling. It's fast if it works, and 

723 # if it doesn't work then it was only a small amount of overhead. 

724 for _ in range(3): 

725 i = data.draw_integer(0, len(self.elements) - 1) 

726 if i not in known_bad_indices: 

727 element = self.get_element(i) 

728 if element is not filter_not_satisfied: 

729 return element 

730 if not known_bad_indices: 

731 data.events[f"Retried draw from {self!r} to satisfy filter"] = "" 

732 known_bad_indices.add(i) 

733 

734 # If we've tried all the possible elements, give up now. 

735 max_good_indices = len(self.elements) - len(known_bad_indices) 

736 if not max_good_indices: 

737 return filter_not_satisfied 

738 

739 # Impose an arbitrary cutoff to prevent us from wasting too much time 

740 # on very large element lists. 

741 max_good_indices = min(max_good_indices, self._MAX_FILTER_CALLS - 3) 

742 

743 # Before building the list of allowed indices, speculatively choose 

744 # one of them. We don't yet know how many allowed indices there will be, 

745 # so this choice might be out-of-bounds, but that's OK. 

746 speculative_index = data.draw_integer(0, max_good_indices - 1) 

747 

748 # Calculate the indices of allowed values, so that we can choose one 

749 # of them at random. But if we encounter the speculatively-chosen one, 

750 # just use that and return immediately. Note that we also track the 

751 # allowed elements, in case of .map(some_stateful_function) 

752 allowed: list[tuple[int, Ex]] = [] 

753 for i in range(min(len(self.elements), self._MAX_FILTER_CALLS - 3)): 

754 if i not in known_bad_indices: 

755 element = self.get_element(i) 

756 if element is not filter_not_satisfied: 

757 assert not isinstance(element, UniqueIdentifier) 

758 allowed.append((i, element)) 

759 if len(allowed) > speculative_index: 

760 # Early-exit case: We reached the speculative index, so 

761 # we just return the corresponding element. 

762 data.draw_integer(0, len(self.elements) - 1, forced=i) 

763 return element 

764 

765 # The speculative index didn't work out, but at this point we've built 

766 # and can choose from the complete list of allowed indices and elements. 

767 if allowed: 

768 i, element = data.choice(allowed) 

769 data.draw_integer(0, len(self.elements) - 1, forced=i) 

770 return element 

771 # If there are no allowed indices, the filter couldn't be satisfied. 

772 return filter_not_satisfied 

773 

774 

775class OneOfStrategy(SearchStrategy[Ex]): 

776 """Implements a union of strategies. Given a number of strategies this 

777 generates values which could have come from any of them. 

778 

779 The conditional distribution draws uniformly at random from some 

780 non-empty subset of these strategies and then draws from the 

781 conditional distribution of that strategy. 

782 """ 

783 

784 def __init__(self, strategies: Sequence[SearchStrategy[Ex]]): 

785 super().__init__() 

786 self.original_strategies = tuple(strategies) 

787 self.__element_strategies: Optional[Sequence[SearchStrategy[Ex]]] = None 

788 self.__in_branches = False 

789 self._branches_lock = RLock() 

790 

791 def calc_is_empty(self, recur: RecurT) -> bool: 

792 return all(recur(e) for e in self.original_strategies) 

793 

794 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

795 return all(recur(e) for e in self.original_strategies) 

796 

797 def calc_is_cacheable(self, recur: RecurT) -> bool: 

798 return all(recur(e) for e in self.original_strategies) 

799 

800 @property 

801 def element_strategies(self) -> Sequence[SearchStrategy[Ex]]: 

802 if self.__element_strategies is None: 

803 # While strategies are hashable, they use object.__hash__ and are 

804 # therefore distinguished only by identity. 

805 # 

806 # In principle we could "just" define a __hash__ method 

807 # (and __eq__, but that's easy in terms of type() and hash()) 

808 # to make this more powerful, but this is harder than it sounds: 

809 # 

810 # 1. Strategies are often distinguished by non-hashable attributes, 

811 # or by attributes that have the same hash value ("^.+" / b"^.+"). 

812 # 2. LazyStrategy: can't reify the wrapped strategy without breaking 

813 # laziness, so there's a hash each for the lazy and the nonlazy. 

814 # 

815 # Having made several attempts, the minor benefits of making strategies 

816 # hashable are simply not worth the engineering effort it would take. 

817 # See also issues #2291 and #2327. 

818 seen: set[SearchStrategy] = {self} 

819 strategies: list[SearchStrategy] = [] 

820 for arg in self.original_strategies: 

821 check_strategy(arg) 

822 if not arg.is_empty: 

823 for s in arg.branches: 

824 if s not in seen and not s.is_empty: 

825 seen.add(s) 

826 strategies.append(s) 

827 self.__element_strategies = strategies 

828 return self.__element_strategies 

829 

830 def calc_label(self) -> int: 

831 return combine_labels( 

832 self.class_label, *(p.label for p in self.original_strategies) 

833 ) 

834 

835 def do_draw(self, data: ConjectureData) -> Ex: 

836 strategy = data.draw( 

837 SampledFromStrategy(self.element_strategies).filter( 

838 lambda s: not s.is_currently_empty(data) 

839 ) 

840 ) 

841 return data.draw(strategy) 

842 

843 def __repr__(self) -> str: 

844 return "one_of({})".format(", ".join(map(repr, self.original_strategies))) 

845 

846 def do_validate(self) -> None: 

847 for e in self.element_strategies: 

848 e.validate() 

849 

850 @property 

851 def branches(self) -> Sequence[SearchStrategy[Ex]]: 

852 if self.__element_strategies is not None: 

853 # common fast path which avoids the lock 

854 return self.element_strategies 

855 

856 with self._branches_lock: 

857 if not self.__in_branches: 

858 try: 

859 self.__in_branches = True 

860 return self.element_strategies 

861 finally: 

862 self.__in_branches = False 

863 else: 

864 return [self] 

865 

866 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]: 

867 return FilteredStrategy( 

868 OneOfStrategy([s.filter(condition) for s in self.original_strategies]), 

869 conditions=(), 

870 ) 

871 

872 

873@overload 

874def one_of( 

875 __args: Sequence[SearchStrategy[Ex]], 

876) -> SearchStrategy[Ex]: # pragma: no cover 

877 ... 

878 

879 

880@overload 

881def one_of(__a1: SearchStrategy[Ex]) -> SearchStrategy[Ex]: # pragma: no cover 

882 ... 

883 

884 

885@overload 

886def one_of( 

887 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T] 

888) -> SearchStrategy[Union[Ex, T]]: # pragma: no cover 

889 ... 

890 

891 

892@overload 

893def one_of( 

894 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T], __a3: SearchStrategy[T3] 

895) -> SearchStrategy[Union[Ex, T, T3]]: # pragma: no cover 

896 ... 

897 

898 

899@overload 

900def one_of( 

901 __a1: SearchStrategy[Ex], 

902 __a2: SearchStrategy[T], 

903 __a3: SearchStrategy[T3], 

904 __a4: SearchStrategy[T4], 

905) -> SearchStrategy[Union[Ex, T, T3, T4]]: # pragma: no cover 

906 ... 

907 

908 

909@overload 

910def one_of( 

911 __a1: SearchStrategy[Ex], 

912 __a2: SearchStrategy[T], 

913 __a3: SearchStrategy[T3], 

914 __a4: SearchStrategy[T4], 

915 __a5: SearchStrategy[T5], 

916) -> SearchStrategy[Union[Ex, T, T3, T4, T5]]: # pragma: no cover 

917 ... 

918 

919 

920@overload 

921def one_of(*args: SearchStrategy[Any]) -> SearchStrategy[Any]: # pragma: no cover 

922 ... 

923 

924 

925@defines_strategy(never_lazy=True) 

926def one_of( 

927 *args: Union[Sequence[SearchStrategy[Any]], SearchStrategy[Any]] 

928) -> SearchStrategy[Any]: 

929 # Mypy workaround alert: Any is too loose above; the return parameter 

930 # should be the union of the input parameters. Unfortunately, Mypy <=0.600 

931 # raises errors due to incompatible inputs instead. See #1270 for links. 

932 # v0.610 doesn't error; it gets inference wrong for 2+ arguments instead. 

933 """Return a strategy which generates values from any of the argument 

934 strategies. 

935 

936 This may be called with one iterable argument instead of multiple 

937 strategy arguments, in which case ``one_of(x)`` and ``one_of(*x)`` are 

938 equivalent. 

939 

940 Examples from this strategy will generally shrink to ones that come from 

941 strategies earlier in the list, then shrink according to behaviour of the 

942 strategy that produced them. In order to get good shrinking behaviour, 

943 try to put simpler strategies first. e.g. ``one_of(none(), text())`` is 

944 better than ``one_of(text(), none())``. 

945 

946 This is especially important when using recursive strategies. e.g. 

947 ``x = st.deferred(lambda: st.none() | st.tuples(x, x))`` will shrink well, 

948 but ``x = st.deferred(lambda: st.tuples(x, x) | st.none())`` will shrink 

949 very badly indeed. 

950 """ 

951 if len(args) == 1 and not isinstance(args[0], SearchStrategy): 

952 try: 

953 args = tuple(args[0]) 

954 except TypeError: 

955 pass 

956 if len(args) == 1 and isinstance(args[0], SearchStrategy): 

957 # This special-case means that we can one_of over lists of any size 

958 # without incurring any performance overhead when there is only one 

959 # strategy, and keeps our reprs simple. 

960 return args[0] 

961 if args and not any(isinstance(a, SearchStrategy) for a in args): 

962 # And this special case is to give a more-specific error message if it 

963 # seems that the user has confused `one_of()` for `sampled_from()`; 

964 # the remaining validation is left to OneOfStrategy. See PR #2627. 

965 raise InvalidArgument( 

966 f"Did you mean st.sampled_from({list(args)!r})? st.one_of() is used " 

967 "to combine strategies, but all of the arguments were of other types." 

968 ) 

969 # we've handled the case where args is a one-element sequence [(s1, s2, ...)] 

970 # above, so we can assume it's an actual sequence of strategies. 

971 args = cast(Sequence[SearchStrategy], args) 

972 return OneOfStrategy(args) 

973 

974 

975class MappedStrategy(SearchStrategy[MappedTo], Generic[MappedFrom, MappedTo]): 

976 """A strategy which is defined purely by conversion to and from another 

977 strategy. 

978 

979 Its parameter and distribution come from that other strategy. 

980 """ 

981 

982 def __init__( 

983 self, 

984 strategy: SearchStrategy[MappedFrom], 

985 pack: Callable[[MappedFrom], MappedTo], 

986 ) -> None: 

987 super().__init__() 

988 self.mapped_strategy = strategy 

989 self.pack = pack 

990 

991 def calc_is_empty(self, recur: RecurT) -> bool: 

992 return recur(self.mapped_strategy) 

993 

994 def calc_is_cacheable(self, recur: RecurT) -> bool: 

995 return recur(self.mapped_strategy) 

996 

997 def __repr__(self) -> str: 

998 if not hasattr(self, "_cached_repr"): 

999 self._cached_repr = f"{self.mapped_strategy!r}.map({get_pretty_function_description(self.pack)})" 

1000 return self._cached_repr 

1001 

1002 def do_validate(self) -> None: 

1003 self.mapped_strategy.validate() 

1004 

1005 def do_draw(self, data: ConjectureData) -> MappedTo: 

1006 with warnings.catch_warnings(): 

1007 if isinstance(self.pack, type) and issubclass( 

1008 self.pack, (abc.Mapping, abc.Set) 

1009 ): 

1010 warnings.simplefilter("ignore", BytesWarning) 

1011 for _ in range(3): 

1012 try: 

1013 data.start_span(MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL) 

1014 x = data.draw(self.mapped_strategy) 

1015 result = self.pack(x) 

1016 data.stop_span() 

1017 current_build_context().record_call( 

1018 result, self.pack, args=[x], kwargs={} 

1019 ) 

1020 return result 

1021 except UnsatisfiedAssumption: 

1022 data.stop_span(discard=True) 

1023 raise UnsatisfiedAssumption 

1024 

1025 @property 

1026 def branches(self) -> Sequence[SearchStrategy[MappedTo]]: 

1027 return [ 

1028 MappedStrategy(strategy, pack=self.pack) 

1029 for strategy in self.mapped_strategy.branches 

1030 ] 

1031 

1032 def filter( 

1033 self, condition: Callable[[MappedTo], Any] 

1034 ) -> "SearchStrategy[MappedTo]": 

1035 # Includes a special case so that we can rewrite filters on collection 

1036 # lengths, when most collections are `st.lists(...).map(the_type)`. 

1037 ListStrategy = _list_strategy_type() 

1038 if not isinstance(self.mapped_strategy, ListStrategy) or not ( 

1039 (isinstance(self.pack, type) and issubclass(self.pack, abc.Collection)) 

1040 or self.pack in _collection_ish_functions() 

1041 ): 

1042 return super().filter(condition) 

1043 

1044 # Check whether our inner list strategy can rewrite this filter condition. 

1045 # If not, discard the result and _only_ apply a new outer filter. 

1046 new = ListStrategy.filter(self.mapped_strategy, condition) 

1047 if getattr(new, "filtered_strategy", None) is self.mapped_strategy: 

1048 return super().filter(condition) # didn't rewrite 

1049 

1050 # Apply a new outer filter even though we rewrote the inner strategy, 

1051 # because some collections can change the list length (dict, set, etc). 

1052 return FilteredStrategy(type(self)(new, self.pack), conditions=(condition,)) 

1053 

1054 

1055@lru_cache 

1056def _list_strategy_type() -> Any: 

1057 from hypothesis.strategies._internal.collections import ListStrategy 

1058 

1059 return ListStrategy 

1060 

1061 

1062def _collection_ish_functions() -> Sequence[Any]: 

1063 funcs = [sorted] 

1064 if np := sys.modules.get("numpy"): 

1065 # c.f. https://numpy.org/doc/stable/reference/routines.array-creation.html 

1066 # Probably only `np.array` and `np.asarray` will be used in practice, 

1067 # but why should that stop us when we've already gone this far? 

1068 funcs += [ 

1069 np.empty_like, 

1070 np.eye, 

1071 np.identity, 

1072 np.ones_like, 

1073 np.zeros_like, 

1074 np.array, 

1075 np.asarray, 

1076 np.asanyarray, 

1077 np.ascontiguousarray, 

1078 np.asmatrix, 

1079 np.copy, 

1080 np.rec.array, 

1081 np.rec.fromarrays, 

1082 np.rec.fromrecords, 

1083 np.diag, 

1084 # bonus undocumented functions from tab-completion: 

1085 np.asarray_chkfinite, 

1086 np.asfortranarray, 

1087 ] 

1088 

1089 return funcs 

1090 

1091 

1092filter_not_satisfied = UniqueIdentifier("filter not satisfied") 

1093 

1094 

1095class FilteredStrategy(SearchStrategy[Ex]): 

1096 def __init__( 

1097 self, strategy: SearchStrategy[Ex], conditions: tuple[Callable[[Ex], Any], ...] 

1098 ): 

1099 super().__init__() 

1100 if isinstance(strategy, FilteredStrategy): 

1101 # Flatten chained filters into a single filter with multiple conditions. 

1102 self.flat_conditions: tuple[Callable[[Ex], Any], ...] = ( 

1103 strategy.flat_conditions + conditions 

1104 ) 

1105 self.filtered_strategy: SearchStrategy[Ex] = strategy.filtered_strategy 

1106 else: 

1107 self.flat_conditions = conditions 

1108 self.filtered_strategy = strategy 

1109 

1110 assert isinstance(self.flat_conditions, tuple) 

1111 assert not isinstance(self.filtered_strategy, FilteredStrategy) 

1112 

1113 self.__condition: Optional[Callable[[Ex], Any]] = None 

1114 

1115 def calc_is_empty(self, recur: RecurT) -> bool: 

1116 return recur(self.filtered_strategy) 

1117 

1118 def calc_is_cacheable(self, recur: RecurT) -> bool: 

1119 return recur(self.filtered_strategy) 

1120 

1121 def __repr__(self) -> str: 

1122 if not hasattr(self, "_cached_repr"): 

1123 self._cached_repr = "{!r}{}".format( 

1124 self.filtered_strategy, 

1125 "".join( 

1126 f".filter({get_pretty_function_description(cond)})" 

1127 for cond in self.flat_conditions 

1128 ), 

1129 ) 

1130 return self._cached_repr 

1131 

1132 def do_validate(self) -> None: 

1133 # Start by validating our inner filtered_strategy. If this was a LazyStrategy, 

1134 # validation also reifies it so that subsequent calls to e.g. `.filter()` will 

1135 # be passed through. 

1136 self.filtered_strategy.validate() 

1137 # So now we have a reified inner strategy, we'll replay all our saved 

1138 # predicates in case some or all of them can be rewritten. Note that this 

1139 # replaces the `fresh` strategy too! 

1140 fresh = self.filtered_strategy 

1141 for cond in self.flat_conditions: 

1142 fresh = fresh.filter(cond) 

1143 if isinstance(fresh, FilteredStrategy): 

1144 # In this case we have at least some non-rewritten filter predicates, 

1145 # so we just re-initialize the strategy. 

1146 FilteredStrategy.__init__( 

1147 self, fresh.filtered_strategy, fresh.flat_conditions 

1148 ) 

1149 else: 

1150 # But if *all* the predicates were rewritten... well, do_validate() is 

1151 # an in-place method so we still just re-initialize the strategy! 

1152 FilteredStrategy.__init__(self, fresh, ()) 

1153 

1154 def filter(self, condition: Callable[[Ex], Any]) -> "FilteredStrategy[Ex]": 

1155 # If we can, it's more efficient to rewrite our strategy to satisfy the 

1156 # condition. We therefore exploit the fact that the order of predicates 

1157 # doesn't matter (`f(x) and g(x) == g(x) and f(x)`) by attempting to apply 

1158 # condition directly to our filtered strategy as the inner-most filter. 

1159 out = self.filtered_strategy.filter(condition) 

1160 # If it couldn't be rewritten, we'll get a new FilteredStrategy - and then 

1161 # combine the conditions of each in our expected newest=last order. 

1162 if isinstance(out, FilteredStrategy): 

1163 return FilteredStrategy( 

1164 out.filtered_strategy, self.flat_conditions + out.flat_conditions 

1165 ) 

1166 # But if it *could* be rewritten, we can return the more efficient form! 

1167 return FilteredStrategy(out, self.flat_conditions) 

1168 

1169 @property 

1170 def condition(self) -> Callable[[Ex], Any]: 

1171 # We write this defensively to avoid any threading race conditions 

1172 # with our manual FilteredStrategy.__init__ for filter-rewriting. 

1173 # See https://github.com/HypothesisWorks/hypothesis/pull/4522. 

1174 if (condition := self.__condition) is not None: 

1175 return condition 

1176 

1177 if len(self.flat_conditions) == 1: 

1178 # Avoid an extra indirection in the common case of only one condition. 

1179 condition = self.flat_conditions[0] 

1180 elif len(self.flat_conditions) == 0: 

1181 # Possible, if unlikely, due to filter predicate rewriting 

1182 condition = lambda _: True # type: ignore # covariant type param 

1183 else: 

1184 condition = lambda x: all( # type: ignore # covariant type param 

1185 cond(x) for cond in self.flat_conditions 

1186 ) 

1187 self.__condition = condition 

1188 return condition 

1189 

1190 def do_draw(self, data: ConjectureData) -> Ex: 

1191 result = self.do_filtered_draw(data) 

1192 if result is not filter_not_satisfied: 

1193 return cast(Ex, result) 

1194 

1195 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}") 

1196 

1197 def do_filtered_draw(self, data: ConjectureData) -> Union[Ex, UniqueIdentifier]: 

1198 for i in range(3): 

1199 data.start_span(FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL) 

1200 value = data.draw(self.filtered_strategy) 

1201 if self.condition(value): 

1202 data.stop_span() 

1203 return value 

1204 else: 

1205 data.stop_span(discard=True) 

1206 if i == 0: 

1207 data.events[f"Retried draw from {self!r} to satisfy filter"] = "" 

1208 

1209 return filter_not_satisfied 

1210 

1211 @property 

1212 def branches(self) -> Sequence[SearchStrategy[Ex]]: 

1213 return [ 

1214 FilteredStrategy(strategy=strategy, conditions=self.flat_conditions) 

1215 for strategy in self.filtered_strategy.branches 

1216 ] 

1217 

1218 

1219@check_function 

1220def check_strategy(arg: object, name: str = "") -> None: 

1221 assert isinstance(name, str) 

1222 if not isinstance(arg, SearchStrategy): 

1223 hint = "" 

1224 if isinstance(arg, (list, tuple)): 

1225 hint = ", such as st.sampled_from({}),".format(name or "...") 

1226 if name: 

1227 name += "=" 

1228 raise InvalidArgument( 

1229 f"Expected a SearchStrategy{hint} but got {name}{arg!r} " 

1230 f"(type={type(arg).__name__})" 

1231 )