Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/strategies/_internal/strategies.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

493 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import sys 

12import threading 

13import warnings 

14from collections import abc, defaultdict 

15from collections.abc import Callable, Sequence 

16from functools import lru_cache 

17from random import shuffle 

18from threading import RLock 

19from typing import ( 

20 TYPE_CHECKING, 

21 Any, 

22 ClassVar, 

23 Generic, 

24 Literal, 

25 TypeAlias, 

26 TypeVar, 

27 cast, 

28 overload, 

29) 

30 

31from hypothesis._settings import HealthCheck, Phase, Verbosity, settings 

32from hypothesis.control import _current_build_context, current_build_context 

33from hypothesis.errors import ( 

34 HypothesisException, 

35 HypothesisWarning, 

36 InvalidArgument, 

37 NonInteractiveExampleWarning, 

38 UnsatisfiedAssumption, 

39) 

40from hypothesis.internal.conjecture import utils as cu 

41from hypothesis.internal.conjecture.data import ConjectureData 

42from hypothesis.internal.conjecture.utils import ( 

43 calc_label_from_cls, 

44 calc_label_from_hash, 

45 calc_label_from_name, 

46 combine_labels, 

47) 

48from hypothesis.internal.coverage import check_function 

49from hypothesis.internal.reflection import ( 

50 get_pretty_function_description, 

51 is_identity_function, 

52) 

53from hypothesis.strategies._internal.utils import defines_strategy 

54from hypothesis.utils.conventions import UniqueIdentifier 

55 

56if TYPE_CHECKING: 

57 Ex = TypeVar("Ex", covariant=True, default=Any) 

58else: 

59 Ex = TypeVar("Ex", covariant=True) 

60 

61T = TypeVar("T") 

62T3 = TypeVar("T3") 

63T4 = TypeVar("T4") 

64T5 = TypeVar("T5") 

65MappedFrom = TypeVar("MappedFrom") 

66MappedTo = TypeVar("MappedTo") 

67RecurT: TypeAlias = Callable[["SearchStrategy"], bool] 

68calculating = UniqueIdentifier("calculating") 

69 

70MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name( 

71 "another attempted draw in MappedStrategy" 

72) 

73 

74FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name( 

75 "single loop iteration in FilteredStrategy" 

76) 

77 

78label_lock = RLock() 

79 

80 

81def recursive_property(strategy: "SearchStrategy", name: str, default: object) -> Any: 

82 """Handle properties which may be mutually recursive among a set of 

83 strategies. 

84 

85 These are essentially lazily cached properties, with the ability to set 

86 an override: If the property has not been explicitly set, we calculate 

87 it on first access and memoize the result for later. 

88 

89 The problem is that for properties that depend on each other, a naive 

90 calculation strategy may hit infinite recursion. Consider for example 

91 the property is_empty. A strategy defined as x = st.deferred(lambda: x) 

92 is certainly empty (in order to draw a value from x we would have to 

93 draw a value from x, for which we would have to draw a value from x, 

94 ...), but in order to calculate it the naive approach would end up 

95 calling x.is_empty in order to calculate x.is_empty in order to etc. 

96 

97 The solution is one of fixed point calculation. We start with a default 

98 value that is the value of the property in the absence of evidence to 

99 the contrary, and then update the values of the property for all 

100 dependent strategies until we reach a fixed point. 

101 

102 The approach taken roughly follows that in section 4.2 of Adams, 

103 Michael D., Celeste Hollenbeck, and Matthew Might. "On the complexity 

104 and performance of parsing with derivatives." ACM SIGPLAN Notices 51.6 

105 (2016): 224-236. 

106 """ 

107 assert name in {"is_empty", "has_reusable_values", "is_cacheable"} 

108 cache_key = "cached_" + name 

109 calculation = "calc_" + name 

110 force_key = "force_" + name 

111 

112 def forced_value(target: SearchStrategy) -> Any: 

113 try: 

114 return getattr(target, force_key) 

115 except AttributeError: 

116 return getattr(target, cache_key) 

117 

118 try: 

119 return forced_value(strategy) 

120 except AttributeError: 

121 pass 

122 

123 mapping: dict[SearchStrategy, Any] = {} 

124 sentinel = object() 

125 hit_recursion = False 

126 

127 # For a first pass we do a direct recursive calculation of the 

128 # property, but we block recursively visiting a value in the 

129 # computation of its property: When that happens, we simply 

130 # note that it happened and return the default value. 

131 def recur(strat: SearchStrategy) -> Any: 

132 nonlocal hit_recursion 

133 try: 

134 return forced_value(strat) 

135 except AttributeError: 

136 pass 

137 result = mapping.get(strat, sentinel) 

138 if result is calculating: 

139 hit_recursion = True 

140 return default 

141 elif result is sentinel: 

142 mapping[strat] = calculating 

143 mapping[strat] = getattr(strat, calculation)(recur) 

144 return mapping[strat] 

145 return result 

146 

147 recur(strategy) 

148 

149 # If we hit self-recursion in the computation of any strategy 

150 # value, our mapping at the end is imprecise - it may or may 

151 # not have the right values in it. We now need to proceed with 

152 # a more careful fixed point calculation to get the exact 

153 # values. Hopefully our mapping is still pretty good and it 

154 # won't take a large number of updates to reach a fixed point. 

155 if hit_recursion: 

156 needs_update = set(mapping) 

157 

158 # We track which strategies use which in the course of 

159 # calculating their property value. If A ever uses B in 

160 # the course of calculating its value, then whenever the 

161 # value of B changes we might need to update the value of 

162 # A. 

163 listeners: dict[SearchStrategy, set[SearchStrategy]] = defaultdict(set) 

164 else: 

165 needs_update = None 

166 

167 def recur2(strat: SearchStrategy) -> Any: 

168 def recur_inner(other: SearchStrategy) -> Any: 

169 try: 

170 return forced_value(other) 

171 except AttributeError: 

172 pass 

173 listeners[other].add(strat) 

174 result = mapping.get(other, sentinel) 

175 if result is sentinel: 

176 assert needs_update is not None 

177 needs_update.add(other) 

178 mapping[other] = default 

179 return default 

180 return result 

181 

182 return recur_inner 

183 

184 count = 0 

185 seen = set() 

186 while needs_update: 

187 count += 1 

188 # If we seem to be taking a really long time to stabilize we 

189 # start tracking seen values to attempt to detect an infinite 

190 # loop. This should be impossible, and most code will never 

191 # hit the count, but having an assertion for it means that 

192 # testing is easier to debug and we don't just have a hung 

193 # test. 

194 # Note: This is actually covered, by test_very_deep_deferral 

195 # in tests/cover/test_deferred_strategies.py. Unfortunately it 

196 # runs into a coverage bug. See 

197 # https://github.com/nedbat/coveragepy/issues/605 

198 # for details. 

199 if count > 50: # pragma: no cover 

200 key = frozenset(mapping.items()) 

201 assert key not in seen, (key, name) 

202 seen.add(key) 

203 to_update = needs_update 

204 needs_update = set() 

205 for strat in to_update: 

206 new_value = getattr(strat, calculation)(recur2(strat)) 

207 if new_value != mapping[strat]: 

208 needs_update.update(listeners[strat]) 

209 mapping[strat] = new_value 

210 

211 # We now have a complete and accurate calculation of the 

212 # property values for everything we have seen in the course of 

213 # running this calculation. We simultaneously update all of 

214 # them (not just the strategy we started out with). 

215 for k, v in mapping.items(): 

216 setattr(k, cache_key, v) 

217 return getattr(strategy, cache_key) 

218 

219 

220class SearchStrategy(Generic[Ex]): 

221 """A ``SearchStrategy`` tells Hypothesis how to generate that kind of input. 

222 

223 This class is only part of the public API for use in type annotations, so that 

224 you can write e.g. ``-> SearchStrategy[Foo]`` for your function which returns 

225 ``builds(Foo, ...)``. Do not inherit from or directly instantiate this class. 

226 """ 

227 

228 __module__: str = "hypothesis.strategies" 

229 LABELS: ClassVar[dict[type, int]] = {} 

230 # triggers `assert isinstance(label, int)` under threading when setting this 

231 # in init instead of a classvar. I'm not sure why, init should be safe. But 

232 # this works so I'm not looking into it further atm. 

233 __label: int | UniqueIdentifier | None = None 

234 

235 def __init__(self): 

236 self.validate_called: dict[int, bool] = {} 

237 

238 def is_currently_empty(self, data: ConjectureData) -> bool: 

239 """ 

240 Returns whether this strategy is currently empty. Unlike ``empty``, 

241 which is computed based on static information and cannot change, 

242 ``is_currently_empty`` may change over time based on choices made 

243 during the test case. 

244 

245 This is currently only used for stateful testing, where |Bundle| grows a 

246 list of values to choose from over the course of a test case. 

247 

248 ``data`` will only be used for introspection. No values will be drawn 

249 from it in a way that modifies the choice sequence. 

250 """ 

251 return self.is_empty 

252 

253 @property 

254 def is_empty(self) -> Any: 

255 # Returns True if this strategy can never draw a value and will always 

256 # result in the data being marked invalid. 

257 # The fact that this returns False does not guarantee that a valid value 

258 # can be drawn - this is not intended to be perfect, and is primarily 

259 # intended to be an optimisation for some cases. 

260 return recursive_property(self, "is_empty", True) 

261 

262 # Returns True if values from this strategy can safely be reused without 

263 # this causing unexpected behaviour. 

264 

265 # True if values from this strategy can be implicitly reused (e.g. as 

266 # background values in a numpy array) without causing surprising 

267 # user-visible behaviour. Should be false for built-in strategies that 

268 # produce mutable values, and for strategies that have been mapped/filtered 

269 # by arbitrary user-provided functions. 

270 @property 

271 def has_reusable_values(self) -> Any: 

272 return recursive_property(self, "has_reusable_values", True) 

273 

274 @property 

275 def is_cacheable(self) -> Any: 

276 """ 

277 Whether it is safe to hold on to instances of this strategy in a cache. 

278 See _STRATEGY_CACHE. 

279 """ 

280 return recursive_property(self, "is_cacheable", True) 

281 

282 def calc_is_cacheable(self, recur: RecurT) -> bool: 

283 return True 

284 

285 def calc_is_empty(self, recur: RecurT) -> bool: 

286 # Note: It is correct and significant that the default return value 

287 # from calc_is_empty is False despite the default value for is_empty 

288 # being true. The reason for this is that strategies should be treated 

289 # as empty absent evidence to the contrary, but most basic strategies 

290 # are trivially non-empty and it would be annoying to have to override 

291 # this method to show that. 

292 return False 

293 

294 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

295 return False 

296 

297 def example(self) -> Ex: # FIXME 

298 """Provide an example of the sort of value that this strategy generates. 

299 

300 This method is designed for use in a REPL, and will raise an error if 

301 called from inside |@given| or a strategy definition. For serious use, 

302 see |@composite| or |st.data|. 

303 """ 

304 if getattr(sys, "ps1", None) is None: # pragma: no branch 

305 # The other branch *is* covered in cover/test_examples.py; but as that 

306 # uses `pexpect` for an interactive session `coverage` doesn't see it. 

307 warnings.warn( 

308 "The `.example()` method is good for exploring strategies, but should " 

309 "only be used interactively. We recommend using `@given` for tests - " 

310 "it performs better, saves and replays failures to avoid flakiness, " 

311 f"and reports minimal examples. (strategy: {self!r})", 

312 NonInteractiveExampleWarning, 

313 stacklevel=2, 

314 ) 

315 

316 context = _current_build_context.value 

317 if context is not None: 

318 if context.data is not None and context.data.depth > 0: 

319 raise HypothesisException( 

320 "Using example() inside a strategy definition is a bad " 

321 "idea. Instead consider using hypothesis.strategies.builds() " 

322 "or @hypothesis.strategies.composite to define your strategy." 

323 " See https://hypothesis.readthedocs.io/en/latest/data.html" 

324 "#hypothesis.strategies.builds or " 

325 "https://hypothesis.readthedocs.io/en/latest/data.html" 

326 "#composite-strategies for more details." 

327 ) 

328 else: 

329 raise HypothesisException( 

330 "Using example() inside a test function is a bad " 

331 "idea. Instead consider using hypothesis.strategies.data() " 

332 "to draw more examples during testing. See " 

333 "https://hypothesis.readthedocs.io/en/latest/data.html" 

334 "#drawing-interactively-in-tests for more details." 

335 ) 

336 

337 try: 

338 return self.__examples.pop() 

339 except (AttributeError, IndexError): 

340 self.__examples: list[Ex] = [] 

341 

342 from hypothesis.core import given 

343 

344 # Note: this function has a weird name because it might appear in 

345 # tracebacks, and we want users to know that they can ignore it. 

346 @given(self) 

347 @settings( 

348 database=None, 

349 # generate only a few examples at a time to avoid slow interactivity 

350 # for large strategies. The overhead of @given is very small relative 

351 # to generation, so a small batch size is fine. 

352 max_examples=10, 

353 deadline=None, 

354 verbosity=Verbosity.quiet, 

355 phases=(Phase.generate,), 

356 suppress_health_check=list(HealthCheck), 

357 ) 

358 def example_generating_inner_function( 

359 ex: Ex, # type: ignore # mypy is overzealous in preventing covariant params 

360 ) -> None: 

361 self.__examples.append(ex) 

362 

363 example_generating_inner_function() 

364 shuffle(self.__examples) 

365 return self.__examples.pop() 

366 

367 def map(self, pack: Callable[[Ex], T]) -> "SearchStrategy[T]": 

368 """Returns a new strategy which generates a value from this one, and 

369 then returns ``pack(value)``. For example, ``integers().map(str)`` 

370 could generate ``str(5)`` == ``"5"``. 

371 """ 

372 if is_identity_function(pack): 

373 return self # type: ignore # Mypy has no way to know that `Ex == T` 

374 return MappedStrategy(self, pack=pack) 

375 

376 def flatmap( 

377 self, expand: Callable[[Ex], "SearchStrategy[T]"] 

378 ) -> "SearchStrategy[T]": # FIXME 

379 """Old syntax for a special case of |@composite|: 

380 

381 .. code-block:: python 

382 

383 @st.composite 

384 def flatmap_like(draw, base_strategy, expand): 

385 value = draw(base_strategy) 

386 new_strategy = expand(value) 

387 return draw(new_strategy) 

388 

389 We find that the greater readability of |@composite| usually outweighs 

390 the verbosity, with a few exceptions for simple cases or recipes like 

391 ``from_type(type).flatmap(from_type)`` ("pick a type, get a strategy for 

392 any instance of that type, and then generate one of those"). 

393 """ 

394 from hypothesis.strategies._internal.flatmapped import FlatMapStrategy 

395 

396 return FlatMapStrategy(self, expand=expand) 

397 

398 # Note that we previously had condition extracted to a type alias as 

399 # PredicateT. However, that was only useful when not specifying a relationship 

400 # between the generic Ts and some other function param / return value. 

401 # If we do want to - like here, where we want to say that the Ex arg to condition 

402 # is of the same type as the strategy's Ex - then you need to write out the 

403 # entire Callable[[Ex], Any] expression rather than use a type alias. 

404 # TypeAlias is *not* simply a macro that inserts the text. TypeAlias will not 

405 # reference the local TypeVar context. 

406 def filter(self, condition: Callable[[Ex], Any]) -> "SearchStrategy[Ex]": 

407 """Returns a new strategy that generates values from this strategy 

408 which satisfy the provided condition. 

409 

410 Note that if the condition is too hard to satisfy this might result 

411 in your tests failing with an Unsatisfiable exception. 

412 A basic version of the filtering logic would look something like: 

413 

414 .. code-block:: python 

415 

416 @st.composite 

417 def filter_like(draw, strategy, condition): 

418 for _ in range(3): 

419 value = draw(strategy) 

420 if condition(value): 

421 return value 

422 assume(False) 

423 """ 

424 return FilteredStrategy(self, conditions=(condition,)) 

425 

426 @property 

427 def branches(self) -> Sequence["SearchStrategy[Ex]"]: 

428 return [self] 

429 

430 def __or__(self, other: "SearchStrategy[T]") -> "SearchStrategy[Ex | T]": 

431 """Return a strategy which produces values by randomly drawing from one 

432 of this strategy or the other strategy. 

433 

434 This method is part of the public API. 

435 """ 

436 if not isinstance(other, SearchStrategy): 

437 raise ValueError(f"Cannot | a SearchStrategy with {other!r}") 

438 

439 # Unwrap explicitly or'd strategies. This turns the 

440 # common case of e.g. st.integers() | st.integers() | st.integers() from 

441 # 

442 # one_of(one_of(integers(), integers()), integers()) 

443 # 

444 # into 

445 # 

446 # one_of(integers(), integers(), integers()) 

447 # 

448 # This is purely an aesthetic unwrapping, for e.g. reprs. In practice 

449 # we use .branches / .element_strategies to get the list of possible 

450 # strategies, so this unwrapping is *not* necessary for correctness. 

451 strategies: list[SearchStrategy] = [] 

452 strategies.extend( 

453 self.original_strategies if isinstance(self, OneOfStrategy) else [self] 

454 ) 

455 strategies.extend( 

456 other.original_strategies if isinstance(other, OneOfStrategy) else [other] 

457 ) 

458 return OneOfStrategy(strategies) 

459 

460 def __bool__(self) -> bool: 

461 warnings.warn( 

462 f"bool({self!r}) is always True, did you mean to draw a value?", 

463 HypothesisWarning, 

464 stacklevel=2, 

465 ) 

466 return True 

467 

468 def validate(self) -> None: 

469 """Throw an exception if the strategy is not valid. 

470 

471 Strategies should implement ``do_validate``, which is called by this 

472 method. They should not override ``validate``. 

473 

474 This can happen due to invalid arguments, or lazy construction. 

475 """ 

476 thread_id = threading.get_ident() 

477 if self.validate_called.get(thread_id, False): 

478 return 

479 # we need to set validate_called before calling do_validate, for 

480 # recursive / deferred strategies. But if a thread switches after 

481 # validate_called but before do_validate, we might have a strategy 

482 # which does weird things like drawing when do_validate would error but 

483 # its params are technically valid (e.g. a param was passed as 1.0 

484 # instead of 1) and get into weird internal states. 

485 # 

486 # There are two ways to fix this. 

487 # (1) The first is a per-strategy lock around do_validate. Even though we 

488 # expect near-zero lock contention, this still adds the lock overhead. 

489 # (2) The second is allowing concurrent .validate calls. Since validation 

490 # is (assumed to be) deterministic, both threads will produce the same 

491 # end state, so the validation order or race conditions does not matter. 

492 # 

493 # In order to avoid the lock overhead of (1), we use (2) here. See also 

494 # discussion in https://github.com/HypothesisWorks/hypothesis/pull/4473. 

495 try: 

496 self.validate_called[thread_id] = True 

497 self.do_validate() 

498 self.is_empty 

499 self.has_reusable_values 

500 except Exception: 

501 self.validate_called[thread_id] = False 

502 raise 

503 

504 @property 

505 def class_label(self) -> int: 

506 cls = self.__class__ 

507 try: 

508 return cls.LABELS[cls] 

509 except KeyError: 

510 pass 

511 result = calc_label_from_cls(cls) 

512 cls.LABELS[cls] = result 

513 return result 

514 

515 @property 

516 def label(self) -> int: 

517 if isinstance((label := self.__label), int): 

518 # avoid locking if we've already completely computed the label. 

519 return label 

520 

521 with label_lock: 

522 if self.__label is calculating: 

523 return 0 

524 self.__label = calculating 

525 self.__label = self.calc_label() 

526 return self.__label 

527 

528 def calc_label(self) -> int: 

529 return self.class_label 

530 

531 def do_validate(self) -> None: 

532 pass 

533 

534 def do_draw(self, data: ConjectureData) -> Ex: 

535 raise NotImplementedError(f"{type(self).__name__}.do_draw") 

536 

537 

538def _is_hashable(value: object) -> tuple[bool, int | None]: 

539 # hashing can be expensive; return the hash value if we compute it, so that 

540 # callers don't have to recompute. 

541 try: 

542 return (True, hash(value)) 

543 except TypeError: 

544 return (False, None) 

545 

546 

547def is_hashable(value: object) -> bool: 

548 return _is_hashable(value)[0] 

549 

550 

551class SampledFromStrategy(SearchStrategy[Ex]): 

552 """A strategy which samples from a set of elements. This is essentially 

553 equivalent to using a OneOfStrategy over Just strategies but may be more 

554 efficient and convenient. 

555 """ 

556 

557 _MAX_FILTER_CALLS: ClassVar[int] = 10_000 

558 

559 def __init__( 

560 self, 

561 elements: Sequence[Ex], 

562 *, 

563 force_repr: str | None = None, 

564 force_repr_braces: tuple[str, str] | None = None, 

565 transformations: tuple[ 

566 tuple[Literal["filter", "map"], Callable[[Ex], Any]], 

567 ..., 

568 ] = (), 

569 ): 

570 super().__init__() 

571 self.elements = cu.check_sample(elements, "sampled_from") 

572 assert self.elements 

573 self.force_repr = force_repr 

574 self.force_repr_braces = force_repr_braces 

575 self._transformations = transformations 

576 

577 self._cached_repr: str | None = None 

578 

579 def map(self, pack: Callable[[Ex], T]) -> SearchStrategy[T]: 

580 s = type(self)( 

581 self.elements, 

582 force_repr=self.force_repr, 

583 force_repr_braces=self.force_repr_braces, 

584 transformations=(*self._transformations, ("map", pack)), 

585 ) 

586 # guaranteed by the ("map", pack) transformation 

587 return cast(SearchStrategy[T], s) 

588 

589 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]: 

590 return type(self)( 

591 self.elements, 

592 force_repr=self.force_repr, 

593 force_repr_braces=self.force_repr_braces, 

594 transformations=(*self._transformations, ("filter", condition)), 

595 ) 

596 

597 def __repr__(self): 

598 if self._cached_repr is None: 

599 rep = get_pretty_function_description 

600 elements_s = ( 

601 ", ".join(rep(v) for v in self.elements[:512]) + ", ..." 

602 if len(self.elements) > 512 

603 else ", ".join(rep(v) for v in self.elements) 

604 ) 

605 braces = self.force_repr_braces or ("(", ")") 

606 instance_s = ( 

607 self.force_repr or f"sampled_from({braces[0]}{elements_s}{braces[1]})" 

608 ) 

609 transforms_s = "".join( 

610 f".{name}({get_pretty_function_description(f)})" 

611 for name, f in self._transformations 

612 ) 

613 repr_s = instance_s + transforms_s 

614 self._cached_repr = repr_s 

615 return self._cached_repr 

616 

617 def calc_label(self) -> int: 

618 # strategy.label is effectively an under-approximation of structural 

619 # equality (i.e., some strategies may have the same label when they are not 

620 # structurally identical). More importantly for calculating the 

621 # SampledFromStrategy label, we might have hash(s1) != hash(s2) even 

622 # when s1 and s2 are structurally identical. For instance: 

623 # 

624 # s1 = st.sampled_from([st.none()]) 

625 # s2 = st.sampled_from([st.none()]) 

626 # assert hash(s1) != hash(s2) 

627 # 

628 # (see also test cases in test_labels.py). 

629 # 

630 # We therefore use the labels of any component strategies when calculating 

631 # our label, and only use the hash if it is not a strategy. 

632 # 

633 # That's the ideal, anyway. In reality the logic is more complicated than 

634 # necessary in order to be efficient in the presence of (very) large sequences: 

635 # * add an unabashed special case for range, to avoid iteration over an 

636 # enormous range when we know it is entirely integers. 

637 # * if there is at least one strategy in self.elements, use strategy label, 

638 # and the element hash otherwise. 

639 # * if there are no strategies in self.elements, take the hash of the 

640 # entire sequence. This prevents worst-case performance of hashing each 

641 # element when a hash of the entire sequence would have sufficed. 

642 # 

643 # The worst case performance of this scheme is 

644 # itertools.chain(range(2**100), [st.none()]), where it degrades to 

645 # hashing every int in the range. 

646 (elements_is_hashable, hash_value) = _is_hashable(self.elements) 

647 if isinstance(self.elements, range) or ( 

648 elements_is_hashable 

649 and not any(isinstance(e, SearchStrategy) for e in self.elements) 

650 ): 

651 return combine_labels( 

652 self.class_label, calc_label_from_name(str(hash_value)) 

653 ) 

654 

655 labels = [self.class_label] 

656 for element in self.elements: 

657 if not is_hashable(element): 

658 continue 

659 

660 labels.append( 

661 element.label 

662 if isinstance(element, SearchStrategy) 

663 else calc_label_from_hash(element) 

664 ) 

665 

666 return combine_labels(*labels) 

667 

668 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

669 # Because our custom .map/.filter implementations skip the normal 

670 # wrapper strategies (which would automatically return False for us), 

671 # we need to manually return False here if any transformations have 

672 # been applied. 

673 return not self._transformations 

674 

675 def calc_is_cacheable(self, recur: RecurT) -> bool: 

676 return is_hashable(self.elements) 

677 

678 def _transform( 

679 self, 

680 # https://github.com/python/mypy/issues/7049, we're not writing `element` 

681 # anywhere in the class so this is still type-safe. mypy is being more 

682 # conservative than necessary 

683 element: Ex, # type: ignore 

684 ) -> Ex | UniqueIdentifier: 

685 # Used in UniqueSampledListStrategy 

686 for name, f in self._transformations: 

687 if name == "map": 

688 result = f(element) 

689 if build_context := _current_build_context.value: 

690 build_context.record_call(result, f, args=[element], kwargs={}) 

691 element = result 

692 else: 

693 assert name == "filter" 

694 if not f(element): 

695 return filter_not_satisfied 

696 return element 

697 

698 def do_draw(self, data: ConjectureData) -> Ex: 

699 result = self.do_filtered_draw(data) 

700 if isinstance(result, SearchStrategy) and all( 

701 isinstance(x, SearchStrategy) for x in self.elements 

702 ): 

703 data._sampled_from_all_strategies_elements_message = ( 

704 "sampled_from was given a collection of strategies: " 

705 "{!r}. Was one_of intended?", 

706 self.elements, 

707 ) 

708 if result is filter_not_satisfied: 

709 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}") 

710 assert not isinstance(result, UniqueIdentifier) 

711 return result 

712 

713 def get_element(self, i: int) -> Ex | UniqueIdentifier: 

714 return self._transform(self.elements[i]) 

715 

716 def do_filtered_draw(self, data: ConjectureData) -> Ex | UniqueIdentifier: 

717 # Set of indices that have been tried so far, so that we never test 

718 # the same element twice during a draw. 

719 known_bad_indices: set[int] = set() 

720 

721 # Start with ordinary rejection sampling. It's fast if it works, and 

722 # if it doesn't work then it was only a small amount of overhead. 

723 for _ in range(3): 

724 i = data.draw_integer(0, len(self.elements) - 1) 

725 if i not in known_bad_indices: 

726 element = self.get_element(i) 

727 if element is not filter_not_satisfied: 

728 return element 

729 if not known_bad_indices: 

730 data.events[f"Retried draw from {self!r} to satisfy filter"] = "" 

731 known_bad_indices.add(i) 

732 

733 # If we've tried all the possible elements, give up now. 

734 max_good_indices = len(self.elements) - len(known_bad_indices) 

735 if not max_good_indices: 

736 return filter_not_satisfied 

737 

738 # Impose an arbitrary cutoff to prevent us from wasting too much time 

739 # on very large element lists. 

740 max_good_indices = min(max_good_indices, self._MAX_FILTER_CALLS - 3) 

741 

742 # Before building the list of allowed indices, speculatively choose 

743 # one of them. We don't yet know how many allowed indices there will be, 

744 # so this choice might be out-of-bounds, but that's OK. 

745 speculative_index = data.draw_integer(0, max_good_indices - 1) 

746 

747 # Calculate the indices of allowed values, so that we can choose one 

748 # of them at random. But if we encounter the speculatively-chosen one, 

749 # just use that and return immediately. Note that we also track the 

750 # allowed elements, in case of .map(some_stateful_function) 

751 allowed: list[tuple[int, Ex]] = [] 

752 for i in range(min(len(self.elements), self._MAX_FILTER_CALLS - 3)): 

753 if i not in known_bad_indices: 

754 element = self.get_element(i) 

755 if element is not filter_not_satisfied: 

756 assert not isinstance(element, UniqueIdentifier) 

757 allowed.append((i, element)) 

758 if len(allowed) > speculative_index: 

759 # Early-exit case: We reached the speculative index, so 

760 # we just return the corresponding element. 

761 data.draw_integer(0, len(self.elements) - 1, forced=i) 

762 return element 

763 

764 # The speculative index didn't work out, but at this point we've built 

765 # and can choose from the complete list of allowed indices and elements. 

766 if allowed: 

767 i, element = data.choice(allowed) 

768 data.draw_integer(0, len(self.elements) - 1, forced=i) 

769 return element 

770 # If there are no allowed indices, the filter couldn't be satisfied. 

771 return filter_not_satisfied 

772 

773 

774class OneOfStrategy(SearchStrategy[Ex]): 

775 """Implements a union of strategies. Given a number of strategies this 

776 generates values which could have come from any of them. 

777 

778 The conditional distribution draws uniformly at random from some 

779 non-empty subset of these strategies and then draws from the 

780 conditional distribution of that strategy. 

781 """ 

782 

783 def __init__(self, strategies: Sequence[SearchStrategy[Ex]]): 

784 super().__init__() 

785 self.original_strategies = tuple(strategies) 

786 self.__element_strategies: Sequence[SearchStrategy[Ex]] | None = None 

787 self.__in_branches = False 

788 self._branches_lock = RLock() 

789 

790 def calc_is_empty(self, recur: RecurT) -> bool: 

791 return all(recur(e) for e in self.original_strategies) 

792 

793 def calc_has_reusable_values(self, recur: RecurT) -> bool: 

794 return all(recur(e) for e in self.original_strategies) 

795 

796 def calc_is_cacheable(self, recur: RecurT) -> bool: 

797 return all(recur(e) for e in self.original_strategies) 

798 

799 @property 

800 def element_strategies(self) -> Sequence[SearchStrategy[Ex]]: 

801 if self.__element_strategies is None: 

802 # While strategies are hashable, they use object.__hash__ and are 

803 # therefore distinguished only by identity. 

804 # 

805 # In principle we could "just" define a __hash__ method 

806 # (and __eq__, but that's easy in terms of type() and hash()) 

807 # to make this more powerful, but this is harder than it sounds: 

808 # 

809 # 1. Strategies are often distinguished by non-hashable attributes, 

810 # or by attributes that have the same hash value ("^.+" / b"^.+"). 

811 # 2. LazyStrategy: can't reify the wrapped strategy without breaking 

812 # laziness, so there's a hash each for the lazy and the nonlazy. 

813 # 

814 # Having made several attempts, the minor benefits of making strategies 

815 # hashable are simply not worth the engineering effort it would take. 

816 # See also issues #2291 and #2327. 

817 seen: set[SearchStrategy] = {self} 

818 strategies: list[SearchStrategy] = [] 

819 for arg in self.original_strategies: 

820 check_strategy(arg) 

821 if not arg.is_empty: 

822 for s in arg.branches: 

823 if s not in seen and not s.is_empty: 

824 seen.add(s) 

825 strategies.append(s) 

826 self.__element_strategies = strategies 

827 return self.__element_strategies 

828 

829 def calc_label(self) -> int: 

830 return combine_labels( 

831 self.class_label, *(p.label for p in self.original_strategies) 

832 ) 

833 

834 def do_draw(self, data: ConjectureData) -> Ex: 

835 strategy = data.draw( 

836 SampledFromStrategy(self.element_strategies).filter( 

837 lambda s: not s.is_currently_empty(data) 

838 ) 

839 ) 

840 return data.draw(strategy) 

841 

842 def __repr__(self) -> str: 

843 return "one_of({})".format(", ".join(map(repr, self.original_strategies))) 

844 

845 def do_validate(self) -> None: 

846 for e in self.element_strategies: 

847 e.validate() 

848 

849 @property 

850 def branches(self) -> Sequence[SearchStrategy[Ex]]: 

851 if self.__element_strategies is not None: 

852 # common fast path which avoids the lock 

853 return self.element_strategies 

854 

855 with self._branches_lock: 

856 if not self.__in_branches: 

857 try: 

858 self.__in_branches = True 

859 return self.element_strategies 

860 finally: 

861 self.__in_branches = False 

862 else: 

863 return [self] 

864 

865 def filter(self, condition: Callable[[Ex], Any]) -> SearchStrategy[Ex]: 

866 return FilteredStrategy( 

867 OneOfStrategy([s.filter(condition) for s in self.original_strategies]), 

868 conditions=(), 

869 ) 

870 

871 

872@overload 

873def one_of( 

874 __args: Sequence[SearchStrategy[Ex]], 

875) -> SearchStrategy[Ex]: # pragma: no cover 

876 ... 

877 

878 

879@overload 

880def one_of(__a1: SearchStrategy[Ex]) -> SearchStrategy[Ex]: # pragma: no cover 

881 ... 

882 

883 

884@overload 

885def one_of( 

886 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T] 

887) -> SearchStrategy[Ex | T]: # pragma: no cover 

888 ... 

889 

890 

891@overload 

892def one_of( 

893 __a1: SearchStrategy[Ex], __a2: SearchStrategy[T], __a3: SearchStrategy[T3] 

894) -> SearchStrategy[Ex | T | T3]: # pragma: no cover 

895 ... 

896 

897 

898@overload 

899def one_of( 

900 __a1: SearchStrategy[Ex], 

901 __a2: SearchStrategy[T], 

902 __a3: SearchStrategy[T3], 

903 __a4: SearchStrategy[T4], 

904) -> SearchStrategy[Ex | T | T3 | T4]: # pragma: no cover 

905 ... 

906 

907 

908@overload 

909def one_of( 

910 __a1: SearchStrategy[Ex], 

911 __a2: SearchStrategy[T], 

912 __a3: SearchStrategy[T3], 

913 __a4: SearchStrategy[T4], 

914 __a5: SearchStrategy[T5], 

915) -> SearchStrategy[Ex | T | T3 | T4 | T5]: # pragma: no cover 

916 ... 

917 

918 

919@overload 

920def one_of(*args: SearchStrategy[Any]) -> SearchStrategy[Any]: # pragma: no cover 

921 ... 

922 

923 

924@defines_strategy(eager=True) 

925def one_of( 

926 *args: Sequence[SearchStrategy[Any]] | SearchStrategy[Any], 

927) -> SearchStrategy[Any]: 

928 # Mypy workaround alert: Any is too loose above; the return parameter 

929 # should be the union of the input parameters. Unfortunately, Mypy <=0.600 

930 # raises errors due to incompatible inputs instead. See #1270 for links. 

931 # v0.610 doesn't error; it gets inference wrong for 2+ arguments instead. 

932 """Return a strategy which generates values from any of the argument 

933 strategies. 

934 

935 This may be called with one iterable argument instead of multiple 

936 strategy arguments, in which case ``one_of(x)`` and ``one_of(*x)`` are 

937 equivalent. 

938 

939 Examples from this strategy will generally shrink to ones that come from 

940 strategies earlier in the list, then shrink according to behaviour of the 

941 strategy that produced them. In order to get good shrinking behaviour, 

942 try to put simpler strategies first. e.g. ``one_of(none(), text())`` is 

943 better than ``one_of(text(), none())``. 

944 

945 This is especially important when using recursive strategies. e.g. 

946 ``x = st.deferred(lambda: st.none() | st.tuples(x, x))`` will shrink well, 

947 but ``x = st.deferred(lambda: st.tuples(x, x) | st.none())`` will shrink 

948 very badly indeed. 

949 """ 

950 if len(args) == 1 and not isinstance(args[0], SearchStrategy): 

951 try: 

952 args = tuple(args[0]) 

953 except TypeError: 

954 pass 

955 if len(args) == 1 and isinstance(args[0], SearchStrategy): 

956 # This special-case means that we can one_of over lists of any size 

957 # without incurring any performance overhead when there is only one 

958 # strategy, and keeps our reprs simple. 

959 return args[0] 

960 if args and not any(isinstance(a, SearchStrategy) for a in args): 

961 # And this special case is to give a more-specific error message if it 

962 # seems that the user has confused `one_of()` for `sampled_from()`; 

963 # the remaining validation is left to OneOfStrategy. See PR #2627. 

964 raise InvalidArgument( 

965 f"Did you mean st.sampled_from({list(args)!r})? st.one_of() is used " 

966 "to combine strategies, but all of the arguments were of other types." 

967 ) 

968 # we've handled the case where args is a one-element sequence [(s1, s2, ...)] 

969 # above, so we can assume it's an actual sequence of strategies. 

970 args = cast(Sequence[SearchStrategy], args) 

971 return OneOfStrategy(args) 

972 

973 

974class MappedStrategy(SearchStrategy[MappedTo], Generic[MappedFrom, MappedTo]): 

975 """A strategy which is defined purely by conversion to and from another 

976 strategy. 

977 

978 Its parameter and distribution come from that other strategy. 

979 """ 

980 

981 def __init__( 

982 self, 

983 strategy: SearchStrategy[MappedFrom], 

984 pack: Callable[[MappedFrom], MappedTo], 

985 ) -> None: 

986 super().__init__() 

987 self.mapped_strategy = strategy 

988 self.pack = pack 

989 

990 def calc_is_empty(self, recur: RecurT) -> bool: 

991 return recur(self.mapped_strategy) 

992 

993 def calc_is_cacheable(self, recur: RecurT) -> bool: 

994 return recur(self.mapped_strategy) 

995 

996 def __repr__(self) -> str: 

997 if not hasattr(self, "_cached_repr"): 

998 self._cached_repr = f"{self.mapped_strategy!r}.map({get_pretty_function_description(self.pack)})" 

999 return self._cached_repr 

1000 

1001 def do_validate(self) -> None: 

1002 self.mapped_strategy.validate() 

1003 

1004 def do_draw(self, data: ConjectureData) -> MappedTo: 

1005 with warnings.catch_warnings(): 

1006 if isinstance(self.pack, type) and issubclass( 

1007 self.pack, (abc.Mapping, abc.Set) 

1008 ): 

1009 warnings.simplefilter("ignore", BytesWarning) 

1010 for _ in range(3): 

1011 try: 

1012 data.start_span(MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL) 

1013 x = data.draw(self.mapped_strategy) 

1014 result = self.pack(x) 

1015 data.stop_span() 

1016 current_build_context().record_call( 

1017 result, self.pack, args=[x], kwargs={} 

1018 ) 

1019 return result 

1020 except UnsatisfiedAssumption: 

1021 data.stop_span(discard=True) 

1022 raise UnsatisfiedAssumption 

1023 

1024 @property 

1025 def branches(self) -> Sequence[SearchStrategy[MappedTo]]: 

1026 return [ 

1027 MappedStrategy(strategy, pack=self.pack) 

1028 for strategy in self.mapped_strategy.branches 

1029 ] 

1030 

1031 def filter( 

1032 self, condition: Callable[[MappedTo], Any] 

1033 ) -> "SearchStrategy[MappedTo]": 

1034 # Includes a special case so that we can rewrite filters on collection 

1035 # lengths, when most collections are `st.lists(...).map(the_type)`. 

1036 ListStrategy = _list_strategy_type() 

1037 if not isinstance(self.mapped_strategy, ListStrategy) or not ( 

1038 (isinstance(self.pack, type) and issubclass(self.pack, abc.Collection)) 

1039 or self.pack in _collection_ish_functions() 

1040 ): 

1041 return super().filter(condition) 

1042 

1043 # Check whether our inner list strategy can rewrite this filter condition. 

1044 # If not, discard the result and _only_ apply a new outer filter. 

1045 new = ListStrategy.filter(self.mapped_strategy, condition) 

1046 if getattr(new, "filtered_strategy", None) is self.mapped_strategy: 

1047 return super().filter(condition) # didn't rewrite 

1048 

1049 # Apply a new outer filter even though we rewrote the inner strategy, 

1050 # because some collections can change the list length (dict, set, etc). 

1051 return FilteredStrategy(type(self)(new, self.pack), conditions=(condition,)) 

1052 

1053 

1054@lru_cache 

1055def _list_strategy_type() -> Any: 

1056 from hypothesis.strategies._internal.collections import ListStrategy 

1057 

1058 return ListStrategy 

1059 

1060 

1061def _collection_ish_functions() -> Sequence[Any]: 

1062 funcs = [sorted] 

1063 if np := sys.modules.get("numpy"): 

1064 # c.f. https://numpy.org/doc/stable/reference/routines.array-creation.html 

1065 # Probably only `np.array` and `np.asarray` will be used in practice, 

1066 # but why should that stop us when we've already gone this far? 

1067 funcs += [ 

1068 np.empty_like, 

1069 np.eye, 

1070 np.identity, 

1071 np.ones_like, 

1072 np.zeros_like, 

1073 np.array, 

1074 np.asarray, 

1075 np.asanyarray, 

1076 np.ascontiguousarray, 

1077 np.asmatrix, 

1078 np.copy, 

1079 np.rec.array, 

1080 np.rec.fromarrays, 

1081 np.rec.fromrecords, 

1082 np.diag, 

1083 # bonus undocumented functions from tab-completion: 

1084 np.asarray_chkfinite, 

1085 np.asfortranarray, 

1086 ] 

1087 

1088 return funcs 

1089 

1090 

1091filter_not_satisfied = UniqueIdentifier("filter not satisfied") 

1092 

1093 

1094class FilteredStrategy(SearchStrategy[Ex]): 

1095 def __init__( 

1096 self, strategy: SearchStrategy[Ex], conditions: tuple[Callable[[Ex], Any], ...] 

1097 ): 

1098 super().__init__() 

1099 if isinstance(strategy, FilteredStrategy): 

1100 # Flatten chained filters into a single filter with multiple conditions. 

1101 self.flat_conditions: tuple[Callable[[Ex], Any], ...] = ( 

1102 strategy.flat_conditions + conditions 

1103 ) 

1104 self.filtered_strategy: SearchStrategy[Ex] = strategy.filtered_strategy 

1105 else: 

1106 self.flat_conditions = conditions 

1107 self.filtered_strategy = strategy 

1108 

1109 assert isinstance(self.flat_conditions, tuple) 

1110 assert not isinstance(self.filtered_strategy, FilteredStrategy) 

1111 

1112 self.__condition: Callable[[Ex], Any] | None = None 

1113 

1114 def calc_is_empty(self, recur: RecurT) -> bool: 

1115 return recur(self.filtered_strategy) 

1116 

1117 def calc_is_cacheable(self, recur: RecurT) -> bool: 

1118 return recur(self.filtered_strategy) 

1119 

1120 def __repr__(self) -> str: 

1121 if not hasattr(self, "_cached_repr"): 

1122 self._cached_repr = "{!r}{}".format( 

1123 self.filtered_strategy, 

1124 "".join( 

1125 f".filter({get_pretty_function_description(cond)})" 

1126 for cond in self.flat_conditions 

1127 ), 

1128 ) 

1129 return self._cached_repr 

1130 

1131 def do_validate(self) -> None: 

1132 # Start by validating our inner filtered_strategy. If this was a LazyStrategy, 

1133 # validation also reifies it so that subsequent calls to e.g. `.filter()` will 

1134 # be passed through. 

1135 self.filtered_strategy.validate() 

1136 # So now we have a reified inner strategy, we'll replay all our saved 

1137 # predicates in case some or all of them can be rewritten. Note that this 

1138 # replaces the `fresh` strategy too! 

1139 fresh = self.filtered_strategy 

1140 for cond in self.flat_conditions: 

1141 fresh = fresh.filter(cond) 

1142 if isinstance(fresh, FilteredStrategy): 

1143 # In this case we have at least some non-rewritten filter predicates, 

1144 # so we just re-initialize the strategy. 

1145 FilteredStrategy.__init__( 

1146 self, fresh.filtered_strategy, fresh.flat_conditions 

1147 ) 

1148 else: 

1149 # But if *all* the predicates were rewritten... well, do_validate() is 

1150 # an in-place method so we still just re-initialize the strategy! 

1151 FilteredStrategy.__init__(self, fresh, ()) 

1152 

1153 def filter(self, condition: Callable[[Ex], Any]) -> "FilteredStrategy[Ex]": 

1154 # If we can, it's more efficient to rewrite our strategy to satisfy the 

1155 # condition. We therefore exploit the fact that the order of predicates 

1156 # doesn't matter (`f(x) and g(x) == g(x) and f(x)`) by attempting to apply 

1157 # condition directly to our filtered strategy as the inner-most filter. 

1158 out = self.filtered_strategy.filter(condition) 

1159 # If it couldn't be rewritten, we'll get a new FilteredStrategy - and then 

1160 # combine the conditions of each in our expected newest=last order. 

1161 if isinstance(out, FilteredStrategy): 

1162 return FilteredStrategy( 

1163 out.filtered_strategy, self.flat_conditions + out.flat_conditions 

1164 ) 

1165 # But if it *could* be rewritten, we can return the more efficient form! 

1166 return FilteredStrategy(out, self.flat_conditions) 

1167 

1168 @property 

1169 def condition(self) -> Callable[[Ex], Any]: 

1170 # We write this defensively to avoid any threading race conditions 

1171 # with our manual FilteredStrategy.__init__ for filter-rewriting. 

1172 # See https://github.com/HypothesisWorks/hypothesis/pull/4522. 

1173 if (condition := self.__condition) is not None: 

1174 return condition 

1175 

1176 if len(self.flat_conditions) == 1: 

1177 # Avoid an extra indirection in the common case of only one condition. 

1178 condition = self.flat_conditions[0] 

1179 elif len(self.flat_conditions) == 0: 

1180 # Possible, if unlikely, due to filter predicate rewriting 

1181 condition = lambda _: True # type: ignore # covariant type param 

1182 else: 

1183 condition = lambda x: all( # type: ignore # covariant type param 

1184 cond(x) for cond in self.flat_conditions 

1185 ) 

1186 self.__condition = condition 

1187 return condition 

1188 

1189 def do_draw(self, data: ConjectureData) -> Ex: 

1190 result = self.do_filtered_draw(data) 

1191 if result is not filter_not_satisfied: 

1192 return cast(Ex, result) 

1193 

1194 data.mark_invalid(f"Aborted test because unable to satisfy {self!r}") 

1195 

1196 def do_filtered_draw(self, data: ConjectureData) -> Ex | UniqueIdentifier: 

1197 for i in range(3): 

1198 data.start_span(FILTERED_SEARCH_STRATEGY_DO_DRAW_LABEL) 

1199 value = data.draw(self.filtered_strategy) 

1200 if self.condition(value): 

1201 data.stop_span() 

1202 return value 

1203 else: 

1204 data.stop_span(discard=True) 

1205 if i == 0: 

1206 data.events[f"Retried draw from {self!r} to satisfy filter"] = "" 

1207 

1208 return filter_not_satisfied 

1209 

1210 @property 

1211 def branches(self) -> Sequence[SearchStrategy[Ex]]: 

1212 return [ 

1213 FilteredStrategy(strategy=strategy, conditions=self.flat_conditions) 

1214 for strategy in self.filtered_strategy.branches 

1215 ] 

1216 

1217 

1218@check_function 

1219def check_strategy(arg: object, name: str = "") -> None: 

1220 assert isinstance(name, str) 

1221 if not isinstance(arg, SearchStrategy): 

1222 hint = "" 

1223 if isinstance(arg, (list, tuple)): 

1224 hint = ", such as st.sampled_from({}),".format(name or "...") 

1225 if name: 

1226 name += "=" 

1227 raise InvalidArgument( 

1228 f"Expected a SearchStrategy{hint} but got {name}{arg!r} " 

1229 f"(type={type(arg).__name__})" 

1230 )