Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/data.py: 67%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

626 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import math 

12import time 

13from collections import defaultdict 

14from collections.abc import Hashable, Iterable, Iterator, Sequence 

15from enum import IntEnum 

16from functools import cached_property 

17from random import Random 

18from typing import ( 

19 TYPE_CHECKING, 

20 Any, 

21 Literal, 

22 NoReturn, 

23 Optional, 

24 TypeVar, 

25 Union, 

26 cast, 

27 overload, 

28) 

29 

30import attr 

31 

32from hypothesis.errors import ( 

33 CannotProceedScopeT, 

34 ChoiceTooLarge, 

35 Frozen, 

36 InvalidArgument, 

37 StopTest, 

38) 

39from hypothesis.internal.cache import LRUCache 

40from hypothesis.internal.compat import add_note 

41from hypothesis.internal.conjecture.choice import ( 

42 BooleanConstraints, 

43 BytesConstraints, 

44 ChoiceConstraintsT, 

45 ChoiceNode, 

46 ChoiceT, 

47 ChoiceTemplate, 

48 ChoiceTypeT, 

49 FloatConstraints, 

50 IntegerConstraints, 

51 StringConstraints, 

52 choice_constraints_key, 

53 choice_from_index, 

54 choice_permitted, 

55 choices_size, 

56) 

57from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time 

58from hypothesis.internal.conjecture.providers import ( 

59 COLLECTION_DEFAULT_MAX_SIZE, 

60 HypothesisProvider, 

61 PrimitiveProvider, 

62) 

63from hypothesis.internal.conjecture.utils import calc_label_from_name 

64from hypothesis.internal.escalation import InterestingOrigin 

65from hypothesis.internal.floats import ( 

66 SMALLEST_SUBNORMAL, 

67 float_to_int, 

68 int_to_float, 

69 sign_aware_lte, 

70) 

71from hypothesis.internal.intervalsets import IntervalSet 

72from hypothesis.internal.observability import PredicateCounts 

73from hypothesis.reporting import debug_report 

74from hypothesis.utils.conventions import not_set 

75from hypothesis.utils.threading import ThreadLocal 

76 

77if TYPE_CHECKING: 

78 from typing import TypeAlias 

79 

80 from hypothesis.strategies import SearchStrategy 

81 from hypothesis.strategies._internal.core import DataObject 

82 from hypothesis.strategies._internal.random import RandomState 

83 from hypothesis.strategies._internal.strategies import Ex 

84 

85 

86def __getattr__(name: str) -> Any: 

87 if name == "AVAILABLE_PROVIDERS": 

88 from hypothesis._settings import note_deprecation 

89 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS 

90 

91 note_deprecation( 

92 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to " 

93 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.", 

94 since="2025-01-25", 

95 has_codemod=False, 

96 stacklevel=1, 

97 ) 

98 return AVAILABLE_PROVIDERS 

99 

100 raise AttributeError( 

101 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}" 

102 ) 

103 

104 

105T = TypeVar("T") 

106TargetObservations = dict[str, Union[int, float]] 

107# index, choice_type, constraints, forced value 

108MisalignedAt: "TypeAlias" = tuple[ 

109 int, ChoiceTypeT, ChoiceConstraintsT, Optional[ChoiceT] 

110] 

111 

112TOP_LABEL = calc_label_from_name("top") 

113MAX_DEPTH = 100 

114 

115threadlocal = ThreadLocal(global_test_counter=int) 

116 

117 

118class ExtraInformation: 

119 """A class for holding shared state on a ``ConjectureData`` that should 

120 be added to the final ``ConjectureResult``.""" 

121 

122 def __repr__(self) -> str: 

123 return "ExtraInformation({})".format( 

124 ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()), 

125 ) 

126 

127 def has_information(self) -> bool: 

128 return bool(self.__dict__) 

129 

130 

131class Status(IntEnum): 

132 OVERRUN = 0 

133 INVALID = 1 

134 VALID = 2 

135 INTERESTING = 3 

136 

137 def __repr__(self) -> str: 

138 return f"Status.{self.name}" 

139 

140 

141@attr.s(slots=True, frozen=True) 

142class StructuralCoverageTag: 

143 label: int = attr.ib() 

144 

145 

146STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {} 

147 

148 

149def structural_coverage(label: int) -> StructuralCoverageTag: 

150 try: 

151 return STRUCTURAL_COVERAGE_CACHE[label] 

152 except KeyError: 

153 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label)) 

154 

155 

156# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for 

157# performance. We lose scan resistance, but that's probably fine here. 

158POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096) 

159 

160 

161class Span: 

162 """A span tracks the hierarchical structure of choices within a single test run. 

163 

164 Spans are created to mark regions of the choice sequence that that are 

165 logically related to each other. For instance, Hypothesis tracks: 

166 - A single top-level span for the entire choice sequence 

167 - A span for the choices made by each strategy 

168 - Some strategies define additional spans within their choices. For instance, 

169 st.lists() tracks the "should add another element" choice and the "add 

170 another element" choices as separate spans. 

171 

172 Spans provide useful information to the shrinker, mutator, targeted PBT, 

173 and other subsystems of Hypothesis. 

174 

175 Rather than store each ``Span`` as a rich object, it is actually 

176 just an index into the ``Spans`` class defined below. This has two 

177 purposes: Firstly, for most properties of spans we will never need 

178 to allocate storage at all, because most properties are not used on 

179 most spans. Secondly, by storing the spans as compact lists 

180 of integers, we save a considerable amount of space compared to 

181 Python's normal object size. 

182 

183 This does have the downside that it increases the amount of allocation 

184 we do, and slows things down as a result, in some usage patterns because 

185 we repeatedly allocate the same Span or int objects, but it will 

186 often dramatically reduce our memory usage, so is worth it. 

187 """ 

188 

189 __slots__ = ("index", "owner") 

190 

191 def __init__(self, owner: "Spans", index: int) -> None: 

192 self.owner = owner 

193 self.index = index 

194 

195 def __eq__(self, other: object) -> bool: 

196 if self is other: 

197 return True 

198 if not isinstance(other, Span): 

199 return NotImplemented 

200 return (self.owner is other.owner) and (self.index == other.index) 

201 

202 def __ne__(self, other: object) -> bool: 

203 if self is other: 

204 return False 

205 if not isinstance(other, Span): 

206 return NotImplemented 

207 return (self.owner is not other.owner) or (self.index != other.index) 

208 

209 def __repr__(self) -> str: 

210 return f"spans[{self.index}]" 

211 

212 @property 

213 def label(self) -> int: 

214 """A label is an opaque value that associates each span with its 

215 approximate origin, such as a particular strategy class or a particular 

216 kind of draw.""" 

217 return self.owner.labels[self.owner.label_indices[self.index]] 

218 

219 @property 

220 def parent(self) -> Optional[int]: 

221 """The index of the span that this one is nested directly within.""" 

222 if self.index == 0: 

223 return None 

224 return self.owner.parentage[self.index] 

225 

226 @property 

227 def start(self) -> int: 

228 return self.owner.starts[self.index] 

229 

230 @property 

231 def end(self) -> int: 

232 return self.owner.ends[self.index] 

233 

234 @property 

235 def depth(self) -> int: 

236 """ 

237 Depth of this span in the span tree. The top-level span has a depth of 0. 

238 """ 

239 return self.owner.depths[self.index] 

240 

241 @property 

242 def discarded(self) -> bool: 

243 """True if this is span's ``stop_span`` call had ``discard`` set to 

244 ``True``. This means we believe that the shrinker should be able to delete 

245 this span completely, without affecting the value produced by its enclosing 

246 strategy. Typically set when a rejection sampler decides to reject a 

247 generated value and try again.""" 

248 return self.index in self.owner.discarded 

249 

250 @property 

251 def choice_count(self) -> int: 

252 """The number of choices in this span.""" 

253 return self.end - self.start 

254 

255 @property 

256 def children(self) -> "list[Span]": 

257 """The list of all spans with this as a parent, in increasing index 

258 order.""" 

259 return [self.owner[i] for i in self.owner.children[self.index]] 

260 

261 

262class SpanProperty: 

263 """There are many properties of spans that we calculate by 

264 essentially rerunning the test case multiple times based on the 

265 calls which we record in SpanProperty. 

266 

267 This class defines a visitor, subclasses of which can be used 

268 to calculate these properties. 

269 """ 

270 

271 def __init__(self, spans: "Spans"): 

272 self.span_stack: list[int] = [] 

273 self.spans = spans 

274 self.span_count = 0 

275 self.choice_count = 0 

276 

277 def run(self) -> Any: 

278 """Rerun the test case with this visitor and return the 

279 results of ``self.finish()``.""" 

280 for record in self.spans.trail: 

281 if record == TrailType.STOP_SPAN_DISCARD: 

282 self.__pop(discarded=True) 

283 elif record == TrailType.STOP_SPAN_NO_DISCARD: 

284 self.__pop(discarded=False) 

285 elif record == TrailType.CHOICE: 

286 self.choice_count += 1 

287 else: 

288 # everything after TrailType.CHOICE is the label of a span start. 

289 self.__push(record - TrailType.CHOICE - 1) 

290 

291 return self.finish() 

292 

293 def __push(self, label_index: int) -> None: 

294 i = self.span_count 

295 assert i < len(self.spans) 

296 self.start_span(i, label_index=label_index) 

297 self.span_count += 1 

298 self.span_stack.append(i) 

299 

300 def __pop(self, *, discarded: bool) -> None: 

301 i = self.span_stack.pop() 

302 self.stop_span(i, discarded=discarded) 

303 

304 def start_span(self, i: int, label_index: int) -> None: 

305 """Called at the start of each span, with ``i`` the 

306 index of the span and ``label_index`` the index of 

307 its label in ``self.spans.labels``.""" 

308 

309 def stop_span(self, i: int, *, discarded: bool) -> None: 

310 """Called at the end of each span, with ``i`` the 

311 index of the span and ``discarded`` being ``True`` if ``stop_span`` 

312 was called with ``discard=True``.""" 

313 

314 def finish(self) -> Any: 

315 raise NotImplementedError 

316 

317 

318class TrailType(IntEnum): 

319 STOP_SPAN_DISCARD = 1 

320 STOP_SPAN_NO_DISCARD = 2 

321 CHOICE = 3 

322 # every trail element larger than TrailType.CHOICE is the label of a span 

323 # start, offset by its index. So the first span label is stored as 4, the 

324 # second as 5, etc, regardless of its actual integer label. 

325 

326 

327class SpanRecord: 

328 """Records the series of ``start_span``, ``stop_span``, and 

329 ``draw_bits`` calls so that these may be stored in ``Spans`` and 

330 replayed when we need to know about the structure of individual 

331 ``Span`` objects. 

332 

333 Note that there is significant similarity between this class and 

334 ``DataObserver``, and the plan is to eventually unify them, but 

335 they currently have slightly different functions and implementations. 

336 """ 

337 

338 def __init__(self) -> None: 

339 self.labels: list[int] = [] 

340 self.__index_of_labels: Optional[dict[int, int]] = {} 

341 self.trail = IntList() 

342 self.nodes: list[ChoiceNode] = [] 

343 

344 def freeze(self) -> None: 

345 self.__index_of_labels = None 

346 

347 def record_choice(self) -> None: 

348 self.trail.append(TrailType.CHOICE) 

349 

350 def start_span(self, label: int) -> None: 

351 assert self.__index_of_labels is not None 

352 try: 

353 i = self.__index_of_labels[label] 

354 except KeyError: 

355 i = self.__index_of_labels.setdefault(label, len(self.labels)) 

356 self.labels.append(label) 

357 self.trail.append(TrailType.CHOICE + 1 + i) 

358 

359 def stop_span(self, *, discard: bool) -> None: 

360 if discard: 

361 self.trail.append(TrailType.STOP_SPAN_DISCARD) 

362 else: 

363 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD) 

364 

365 

366class _starts_and_ends(SpanProperty): 

367 def __init__(self, spans: "Spans") -> None: 

368 super().__init__(spans) 

369 self.starts = IntList.of_length(len(self.spans)) 

370 self.ends = IntList.of_length(len(self.spans)) 

371 

372 def start_span(self, i: int, label_index: int) -> None: 

373 self.starts[i] = self.choice_count 

374 

375 def stop_span(self, i: int, *, discarded: bool) -> None: 

376 self.ends[i] = self.choice_count 

377 

378 def finish(self) -> tuple[IntList, IntList]: 

379 return (self.starts, self.ends) 

380 

381 

382class _discarded(SpanProperty): 

383 def __init__(self, spans: "Spans") -> None: 

384 super().__init__(spans) 

385 self.result: set[int] = set() 

386 

387 def finish(self) -> frozenset[int]: 

388 return frozenset(self.result) 

389 

390 def stop_span(self, i: int, *, discarded: bool) -> None: 

391 if discarded: 

392 self.result.add(i) 

393 

394 

395class _parentage(SpanProperty): 

396 def __init__(self, spans: "Spans") -> None: 

397 super().__init__(spans) 

398 self.result = IntList.of_length(len(self.spans)) 

399 

400 def stop_span(self, i: int, *, discarded: bool) -> None: 

401 if i > 0: 

402 self.result[i] = self.span_stack[-1] 

403 

404 def finish(self) -> IntList: 

405 return self.result 

406 

407 

408class _depths(SpanProperty): 

409 def __init__(self, spans: "Spans") -> None: 

410 super().__init__(spans) 

411 self.result = IntList.of_length(len(self.spans)) 

412 

413 def start_span(self, i: int, label_index: int) -> None: 

414 self.result[i] = len(self.span_stack) 

415 

416 def finish(self) -> IntList: 

417 return self.result 

418 

419 

420class _label_indices(SpanProperty): 

421 def __init__(self, spans: "Spans") -> None: 

422 super().__init__(spans) 

423 self.result = IntList.of_length(len(self.spans)) 

424 

425 def start_span(self, i: int, label_index: int) -> None: 

426 self.result[i] = label_index 

427 

428 def finish(self) -> IntList: 

429 return self.result 

430 

431 

432class _mutator_groups(SpanProperty): 

433 def __init__(self, spans: "Spans") -> None: 

434 super().__init__(spans) 

435 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set) 

436 

437 def start_span(self, i: int, label_index: int) -> None: 

438 # TODO should we discard start == end cases? occurs for eg st.data() 

439 # which is conditionally or never drawn from. arguably swapping 

440 # nodes with the empty list is a useful mutation enabled by start == end? 

441 key = (self.spans[i].start, self.spans[i].end) 

442 self.groups[label_index].add(key) 

443 

444 def finish(self) -> Iterable[set[tuple[int, int]]]: 

445 # Discard groups with only one span, since the mutator can't 

446 # do anything useful with them. 

447 return [g for g in self.groups.values() if len(g) >= 2] 

448 

449 

450class Spans: 

451 """A lazy collection of ``Span`` objects, derived from 

452 the record of recorded behaviour in ``SpanRecord``. 

453 

454 Behaves logically as if it were a list of ``Span`` objects, 

455 but actually mostly exists as a compact store of information 

456 for them to reference into. All properties on here are best 

457 understood as the backing storage for ``Span`` and are 

458 described there. 

459 """ 

460 

461 def __init__(self, record: SpanRecord) -> None: 

462 self.trail = record.trail 

463 self.labels = record.labels 

464 self.__length = self.trail.count( 

465 TrailType.STOP_SPAN_DISCARD 

466 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD) 

467 self.__children: Optional[list[Sequence[int]]] = None 

468 

469 @cached_property 

470 def starts_and_ends(self) -> tuple[IntList, IntList]: 

471 return _starts_and_ends(self).run() 

472 

473 @property 

474 def starts(self) -> IntList: 

475 return self.starts_and_ends[0] 

476 

477 @property 

478 def ends(self) -> IntList: 

479 return self.starts_and_ends[1] 

480 

481 @cached_property 

482 def discarded(self) -> frozenset[int]: 

483 return _discarded(self).run() 

484 

485 @cached_property 

486 def parentage(self) -> IntList: 

487 return _parentage(self).run() 

488 

489 @cached_property 

490 def depths(self) -> IntList: 

491 return _depths(self).run() 

492 

493 @cached_property 

494 def label_indices(self) -> IntList: 

495 return _label_indices(self).run() 

496 

497 @cached_property 

498 def mutator_groups(self) -> list[set[tuple[int, int]]]: 

499 return _mutator_groups(self).run() 

500 

501 @property 

502 def children(self) -> list[Sequence[int]]: 

503 if self.__children is None: 

504 children = [IntList() for _ in range(len(self))] 

505 for i, p in enumerate(self.parentage): 

506 if i > 0: 

507 children[p].append(i) 

508 # Replace empty children lists with a tuple to reduce 

509 # memory usage. 

510 for i, c in enumerate(children): 

511 if not c: 

512 children[i] = () # type: ignore 

513 self.__children = children # type: ignore 

514 return self.__children # type: ignore 

515 

516 def __len__(self) -> int: 

517 return self.__length 

518 

519 def __getitem__(self, i: int) -> Span: 

520 n = self.__length 

521 if i < -n or i >= n: 

522 raise IndexError(f"Index {i} out of range [-{n}, {n})") 

523 if i < 0: 

524 i += n 

525 return Span(self, i) 

526 

527 # not strictly necessary as we have len/getitem, but required for mypy. 

528 # https://github.com/python/mypy/issues/9737 

529 def __iter__(self) -> Iterator[Span]: 

530 for i in range(len(self)): 

531 yield self[i] 

532 

533 

534class _Overrun: 

535 status: Status = Status.OVERRUN 

536 

537 def __repr__(self) -> str: 

538 return "Overrun" 

539 

540 

541Overrun = _Overrun() 

542 

543 

544class DataObserver: 

545 """Observer class for recording the behaviour of a 

546 ConjectureData object, primarily used for tracking 

547 the behaviour in the tree cache.""" 

548 

549 def conclude_test( 

550 self, 

551 status: Status, 

552 interesting_origin: Optional[InterestingOrigin], 

553 ) -> None: 

554 """Called when ``conclude_test`` is called on the 

555 observed ``ConjectureData``, with the same arguments. 

556 

557 Note that this is called after ``freeze`` has completed. 

558 """ 

559 

560 def kill_branch(self) -> None: 

561 """Mark this part of the tree as not worth re-exploring.""" 

562 

563 def draw_integer( 

564 self, value: int, *, constraints: IntegerConstraints, was_forced: bool 

565 ) -> None: 

566 pass 

567 

568 def draw_float( 

569 self, value: float, *, constraints: FloatConstraints, was_forced: bool 

570 ) -> None: 

571 pass 

572 

573 def draw_string( 

574 self, value: str, *, constraints: StringConstraints, was_forced: bool 

575 ) -> None: 

576 pass 

577 

578 def draw_bytes( 

579 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool 

580 ) -> None: 

581 pass 

582 

583 def draw_boolean( 

584 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool 

585 ) -> None: 

586 pass 

587 

588 

589@attr.s(slots=True) 

590class ConjectureResult: 

591 """Result class storing the parts of ConjectureData that we 

592 will care about after the original ConjectureData has outlived its 

593 usefulness.""" 

594 

595 status: Status = attr.ib() 

596 interesting_origin: Optional[InterestingOrigin] = attr.ib() 

597 nodes: tuple[ChoiceNode, ...] = attr.ib(eq=False, repr=False) 

598 length: int = attr.ib() 

599 output: str = attr.ib() 

600 extra_information: Optional[ExtraInformation] = attr.ib() 

601 expected_exception: Optional[BaseException] = attr.ib() 

602 expected_traceback: Optional[str] = attr.ib() 

603 has_discards: bool = attr.ib() 

604 target_observations: TargetObservations = attr.ib() 

605 tags: frozenset[StructuralCoverageTag] = attr.ib() 

606 spans: Spans = attr.ib(repr=False, eq=False) 

607 arg_slices: set[tuple[int, int]] = attr.ib(repr=False) 

608 slice_comments: dict[tuple[int, int], str] = attr.ib(repr=False) 

609 misaligned_at: Optional[MisalignedAt] = attr.ib(repr=False) 

610 cannot_proceed_scope: Optional[CannotProceedScopeT] = attr.ib(repr=False) 

611 

612 def as_result(self) -> "ConjectureResult": 

613 return self 

614 

615 @property 

616 def choices(self) -> tuple[ChoiceT, ...]: 

617 return tuple(node.value for node in self.nodes) 

618 

619 

620class ConjectureData: 

621 @classmethod 

622 def for_choices( 

623 cls, 

624 choices: Sequence[Union[ChoiceTemplate, ChoiceT]], 

625 *, 

626 observer: Optional[DataObserver] = None, 

627 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

628 random: Optional[Random] = None, 

629 ) -> "ConjectureData": 

630 from hypothesis.internal.conjecture.engine import choice_count 

631 

632 return cls( 

633 max_choices=choice_count(choices), 

634 random=random, 

635 prefix=choices, 

636 observer=observer, 

637 provider=provider, 

638 ) 

639 

640 def __init__( 

641 self, 

642 *, 

643 random: Optional[Random], 

644 observer: Optional[DataObserver] = None, 

645 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

646 prefix: Optional[Sequence[Union[ChoiceTemplate, ChoiceT]]] = None, 

647 max_choices: Optional[int] = None, 

648 provider_kw: Optional[dict[str, Any]] = None, 

649 ) -> None: 

650 from hypothesis.internal.conjecture.engine import BUFFER_SIZE 

651 

652 if observer is None: 

653 observer = DataObserver() 

654 if provider_kw is None: 

655 provider_kw = {} 

656 elif not isinstance(provider, type): 

657 raise InvalidArgument( 

658 f"Expected {provider=} to be a class since {provider_kw=} was " 

659 "passed, but got an instance instead." 

660 ) 

661 

662 assert isinstance(observer, DataObserver) 

663 self.observer = observer 

664 self.max_choices = max_choices 

665 self.max_length = BUFFER_SIZE 

666 self.overdraw = 0 

667 self._random = random 

668 

669 self.length = 0 

670 self.index = 0 

671 self.output = "" 

672 self.status = Status.VALID 

673 self.frozen = False 

674 self.testcounter = threadlocal.global_test_counter 

675 threadlocal.global_test_counter += 1 

676 self.start_time = time.perf_counter() 

677 self.gc_start_time = gc_cumulative_time() 

678 self.events: dict[str, Union[str, int, float]] = {} 

679 self.interesting_origin: Optional[InterestingOrigin] = None 

680 self.draw_times: dict[str, float] = {} 

681 self._stateful_run_times: dict[str, float] = defaultdict(float) 

682 self.max_depth = 0 

683 self.has_discards = False 

684 

685 self.provider: PrimitiveProvider = ( 

686 provider(self, **provider_kw) if isinstance(provider, type) else provider 

687 ) 

688 assert isinstance(self.provider, PrimitiveProvider) 

689 

690 self.__result: Optional[ConjectureResult] = None 

691 

692 # Observations used for targeted search. They'll be aggregated in 

693 # ConjectureRunner.generate_new_examples and fed to TargetSelector. 

694 self.target_observations: TargetObservations = {} 

695 

696 # Tags which indicate something about which part of the search space 

697 # this example is in. These are used to guide generation. 

698 self.tags: set[StructuralCoverageTag] = set() 

699 self.labels_for_structure_stack: list[set[int]] = [] 

700 

701 # Normally unpopulated but we need this in the niche case 

702 # that self.as_result() is Overrun but we still want the 

703 # examples for reporting purposes. 

704 self.__spans: Optional[Spans] = None 

705 

706 # We want the top level span to have depth 0, so we start 

707 # at -1. 

708 self.depth = -1 

709 self.__span_record = SpanRecord() 

710 

711 # Slice indices for discrete reportable parts that which-parts-matter can 

712 # try varying, to report if the minimal example always fails anyway. 

713 self.arg_slices: set[tuple[int, int]] = set() 

714 self.slice_comments: dict[tuple[int, int], str] = {} 

715 self._observability_args: dict[str, Any] = {} 

716 self._observability_predicates: defaultdict[str, PredicateCounts] = defaultdict( 

717 PredicateCounts 

718 ) 

719 

720 self._sampled_from_all_strategies_elements_message: Optional[ 

721 tuple[str, object] 

722 ] = None 

723 self._shared_strategy_draws: dict[Hashable, tuple[int, Any]] = {} 

724 self._shared_data_strategy: Optional[DataObject] = None 

725 self._stateful_repr_parts: Optional[list[Any]] = None 

726 self.states_for_ids: Optional[dict[int, RandomState]] = None 

727 self.seeds_to_states: Optional[dict[Any, RandomState]] = None 

728 self.hypothesis_runner = not_set 

729 

730 self.expected_exception: Optional[BaseException] = None 

731 self.expected_traceback: Optional[str] = None 

732 self.extra_information = ExtraInformation() 

733 

734 self.prefix = prefix 

735 self.nodes: tuple[ChoiceNode, ...] = () 

736 self.misaligned_at: Optional[MisalignedAt] = None 

737 self.cannot_proceed_scope: Optional[CannotProceedScopeT] = None 

738 self.start_span(TOP_LABEL) 

739 

740 def __repr__(self) -> str: 

741 return "ConjectureData(%s, %d choices%s)" % ( 

742 self.status.name, 

743 len(self.nodes), 

744 ", frozen" if self.frozen else "", 

745 ) 

746 

747 @property 

748 def choices(self) -> tuple[ChoiceT, ...]: 

749 return tuple(node.value for node in self.nodes) 

750 

751 # draw_* functions might be called in one of two contexts: either "above" or 

752 # "below" the choice sequence. For instance, draw_string calls draw_boolean 

753 # from ``many`` when calculating the number of characters to return. We do 

754 # not want these choices to get written to the choice sequence, because they 

755 # are not true choices themselves. 

756 # 

757 # `observe` formalizes this. The choice will only be written to the choice 

758 # sequence if observe is True. 

759 

760 @overload 

761 def _draw( 

762 self, 

763 choice_type: Literal["integer"], 

764 constraints: IntegerConstraints, 

765 *, 

766 observe: bool, 

767 forced: Optional[int], 

768 ) -> int: ... 

769 

770 @overload 

771 def _draw( 

772 self, 

773 choice_type: Literal["float"], 

774 constraints: FloatConstraints, 

775 *, 

776 observe: bool, 

777 forced: Optional[float], 

778 ) -> float: ... 

779 

780 @overload 

781 def _draw( 

782 self, 

783 choice_type: Literal["string"], 

784 constraints: StringConstraints, 

785 *, 

786 observe: bool, 

787 forced: Optional[str], 

788 ) -> str: ... 

789 

790 @overload 

791 def _draw( 

792 self, 

793 choice_type: Literal["bytes"], 

794 constraints: BytesConstraints, 

795 *, 

796 observe: bool, 

797 forced: Optional[bytes], 

798 ) -> bytes: ... 

799 

800 @overload 

801 def _draw( 

802 self, 

803 choice_type: Literal["boolean"], 

804 constraints: BooleanConstraints, 

805 *, 

806 observe: bool, 

807 forced: Optional[bool], 

808 ) -> bool: ... 

809 

810 def _draw( 

811 self, 

812 choice_type: ChoiceTypeT, 

813 constraints: ChoiceConstraintsT, 

814 *, 

815 observe: bool, 

816 forced: Optional[ChoiceT], 

817 ) -> ChoiceT: 

818 # this is somewhat redundant with the length > max_length check at the 

819 # end of the function, but avoids trying to use a null self.random when 

820 # drawing past the node of a ConjectureData.for_choices data. 

821 if self.length == self.max_length: 

822 debug_report(f"overrun because hit {self.max_length=}") 

823 self.mark_overrun() 

824 if len(self.nodes) == self.max_choices: 

825 debug_report(f"overrun because hit {self.max_choices=}") 

826 self.mark_overrun() 

827 

828 if observe and self.prefix is not None and self.index < len(self.prefix): 

829 value = self._pop_choice(choice_type, constraints, forced=forced) 

830 elif forced is None: 

831 value = getattr(self.provider, f"draw_{choice_type}")(**constraints) 

832 

833 if forced is not None: 

834 value = forced 

835 

836 # nan values generated via int_to_float break list membership: 

837 # 

838 # >>> n = 18444492273895866368 

839 # >>> assert math.isnan(int_to_float(n)) 

840 # >>> assert int_to_float(n) not in [int_to_float(n)] 

841 # 

842 # because int_to_float nans are not equal in the sense of either 

843 # `a == b` or `a is b`. 

844 # 

845 # This can lead to flaky errors when collections require unique 

846 # floats. What was happening is that in some places we provided math.nan 

847 # provide math.nan, and in others we provided 

848 # int_to_float(float_to_int(math.nan)), and which one gets used 

849 # was not deterministic across test iterations. 

850 # 

851 # To fix this, *never* provide a nan value which is equal (via `is`) to 

852 # another provided nan value. This sacrifices some test power; we should 

853 # bring that back (ABOVE the choice sequence layer) in the future. 

854 # 

855 # See https://github.com/HypothesisWorks/hypothesis/issues/3926. 

856 if choice_type == "float": 

857 assert isinstance(value, float) 

858 if math.isnan(value): 

859 value = int_to_float(float_to_int(value)) 

860 

861 if observe: 

862 was_forced = forced is not None 

863 getattr(self.observer, f"draw_{choice_type}")( 

864 value, constraints=constraints, was_forced=was_forced 

865 ) 

866 size = 0 if self.provider.avoid_realization else choices_size([value]) 

867 if self.length + size > self.max_length: 

868 debug_report( 

869 f"overrun because {self.length=} + {size=} > {self.max_length=}" 

870 ) 

871 self.mark_overrun() 

872 

873 node = ChoiceNode( 

874 type=choice_type, 

875 value=value, 

876 constraints=constraints, 

877 was_forced=was_forced, 

878 index=len(self.nodes), 

879 ) 

880 self.__span_record.record_choice() 

881 self.nodes += (node,) 

882 self.length += size 

883 

884 return value 

885 

886 def draw_integer( 

887 self, 

888 min_value: Optional[int] = None, 

889 max_value: Optional[int] = None, 

890 *, 

891 weights: Optional[dict[int, float]] = None, 

892 shrink_towards: int = 0, 

893 forced: Optional[int] = None, 

894 observe: bool = True, 

895 ) -> int: 

896 # Validate arguments 

897 if weights is not None: 

898 assert min_value is not None 

899 assert max_value is not None 

900 assert len(weights) <= 255 # arbitrary practical limit 

901 # We can and should eventually support total weights. But this 

902 # complicates shrinking as we can no longer assume we can force 

903 # a value to the unmapped probability mass if that mass might be 0. 

904 assert sum(weights.values()) < 1 

905 # similarly, things get simpler if we assume every value is possible. 

906 # we'll want to drop this restriction eventually. 

907 assert all(w != 0 for w in weights.values()) 

908 

909 if forced is not None and min_value is not None: 

910 assert min_value <= forced 

911 if forced is not None and max_value is not None: 

912 assert forced <= max_value 

913 

914 constraints: IntegerConstraints = self._pooled_constraints( 

915 "integer", 

916 { 

917 "min_value": min_value, 

918 "max_value": max_value, 

919 "weights": weights, 

920 "shrink_towards": shrink_towards, 

921 }, 

922 ) 

923 return self._draw("integer", constraints, observe=observe, forced=forced) 

924 

925 def draw_float( 

926 self, 

927 min_value: float = -math.inf, 

928 max_value: float = math.inf, 

929 *, 

930 allow_nan: bool = True, 

931 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL, 

932 # TODO: consider supporting these float widths at the choice sequence 

933 # level in the future. 

934 # width: Literal[16, 32, 64] = 64, 

935 forced: Optional[float] = None, 

936 observe: bool = True, 

937 ) -> float: 

938 assert smallest_nonzero_magnitude > 0 

939 assert not math.isnan(min_value) 

940 assert not math.isnan(max_value) 

941 

942 if smallest_nonzero_magnitude == 0.0: # pragma: no cover 

943 raise FloatingPointError( 

944 "Got allow_subnormal=True, but we can't represent subnormal floats " 

945 "right now, in violation of the IEEE-754 floating-point " 

946 "specification. This is usually because something was compiled with " 

947 "-ffast-math or a similar option, which sets global processor state. " 

948 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed " 

949 "writeup - and good luck!" 

950 ) 

951 

952 if forced is not None: 

953 assert allow_nan or not math.isnan(forced) 

954 assert math.isnan(forced) or ( 

955 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value) 

956 ) 

957 

958 constraints: FloatConstraints = self._pooled_constraints( 

959 "float", 

960 { 

961 "min_value": min_value, 

962 "max_value": max_value, 

963 "allow_nan": allow_nan, 

964 "smallest_nonzero_magnitude": smallest_nonzero_magnitude, 

965 }, 

966 ) 

967 return self._draw("float", constraints, observe=observe, forced=forced) 

968 

969 def draw_string( 

970 self, 

971 intervals: IntervalSet, 

972 *, 

973 min_size: int = 0, 

974 max_size: int = COLLECTION_DEFAULT_MAX_SIZE, 

975 forced: Optional[str] = None, 

976 observe: bool = True, 

977 ) -> str: 

978 assert forced is None or min_size <= len(forced) <= max_size 

979 assert min_size >= 0 

980 if len(intervals) == 0: 

981 assert min_size == 0 

982 

983 constraints: StringConstraints = self._pooled_constraints( 

984 "string", 

985 { 

986 "intervals": intervals, 

987 "min_size": min_size, 

988 "max_size": max_size, 

989 }, 

990 ) 

991 return self._draw("string", constraints, observe=observe, forced=forced) 

992 

993 def draw_bytes( 

994 self, 

995 min_size: int = 0, 

996 max_size: int = COLLECTION_DEFAULT_MAX_SIZE, 

997 *, 

998 forced: Optional[bytes] = None, 

999 observe: bool = True, 

1000 ) -> bytes: 

1001 assert forced is None or min_size <= len(forced) <= max_size 

1002 assert min_size >= 0 

1003 

1004 constraints: BytesConstraints = self._pooled_constraints( 

1005 "bytes", {"min_size": min_size, "max_size": max_size} 

1006 ) 

1007 return self._draw("bytes", constraints, observe=observe, forced=forced) 

1008 

1009 def draw_boolean( 

1010 self, 

1011 p: float = 0.5, 

1012 *, 

1013 forced: Optional[bool] = None, 

1014 observe: bool = True, 

1015 ) -> bool: 

1016 assert (forced is not True) or p > 0 

1017 assert (forced is not False) or p < 1 

1018 

1019 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p}) 

1020 return self._draw("boolean", constraints, observe=observe, forced=forced) 

1021 

1022 @overload 

1023 def _pooled_constraints( 

1024 self, choice_type: Literal["integer"], constraints: IntegerConstraints 

1025 ) -> IntegerConstraints: ... 

1026 

1027 @overload 

1028 def _pooled_constraints( 

1029 self, choice_type: Literal["float"], constraints: FloatConstraints 

1030 ) -> FloatConstraints: ... 

1031 

1032 @overload 

1033 def _pooled_constraints( 

1034 self, choice_type: Literal["string"], constraints: StringConstraints 

1035 ) -> StringConstraints: ... 

1036 

1037 @overload 

1038 def _pooled_constraints( 

1039 self, choice_type: Literal["bytes"], constraints: BytesConstraints 

1040 ) -> BytesConstraints: ... 

1041 

1042 @overload 

1043 def _pooled_constraints( 

1044 self, choice_type: Literal["boolean"], constraints: BooleanConstraints 

1045 ) -> BooleanConstraints: ... 

1046 

1047 def _pooled_constraints( 

1048 self, choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT 

1049 ) -> ChoiceConstraintsT: 

1050 """Memoize common dictionary objects to reduce memory pressure.""" 

1051 # caching runs afoul of nondeterminism checks 

1052 if self.provider.avoid_realization: 

1053 return constraints 

1054 

1055 key = (choice_type, *choice_constraints_key(choice_type, constraints)) 

1056 try: 

1057 return POOLED_CONSTRAINTS_CACHE[key] 

1058 except KeyError: 

1059 POOLED_CONSTRAINTS_CACHE[key] = constraints 

1060 return constraints 

1061 

1062 def _pop_choice( 

1063 self, 

1064 choice_type: ChoiceTypeT, 

1065 constraints: ChoiceConstraintsT, 

1066 *, 

1067 forced: Optional[ChoiceT], 

1068 ) -> ChoiceT: 

1069 assert self.prefix is not None 

1070 # checked in _draw 

1071 assert self.index < len(self.prefix) 

1072 

1073 value = self.prefix[self.index] 

1074 if isinstance(value, ChoiceTemplate): 

1075 node: ChoiceTemplate = value 

1076 if node.count is not None: 

1077 assert node.count >= 0 

1078 # node templates have to be at the end for now, since it's not immediately 

1079 # apparent how to handle overruning a node template while generating a single 

1080 # node if the alternative is not "the entire data is an overrun". 

1081 assert self.index == len(self.prefix) - 1 

1082 if node.type == "simplest": 

1083 if forced is not None: 

1084 choice = forced 

1085 try: 

1086 choice = choice_from_index(0, choice_type, constraints) 

1087 except ChoiceTooLarge: 

1088 self.mark_overrun() 

1089 else: 

1090 raise NotImplementedError 

1091 

1092 if node.count is not None: 

1093 node.count -= 1 

1094 if node.count < 0: 

1095 self.mark_overrun() 

1096 return choice 

1097 

1098 choice = value 

1099 node_choice_type = { 

1100 str: "string", 

1101 float: "float", 

1102 int: "integer", 

1103 bool: "boolean", 

1104 bytes: "bytes", 

1105 }[type(choice)] 

1106 # If we're trying to: 

1107 # * draw a different choice type at the same location 

1108 # * draw the same choice type with a different constraints, which does not permit 

1109 # the current value 

1110 # 

1111 # then we call this a misalignment, because the choice sequence has 

1112 # changed from what we expected at some point. An easy misalignment is 

1113 # 

1114 # one_of(integers(0, 100), integers(101, 200)) 

1115 # 

1116 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100} 

1117 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at 

1118 # index 1 (which does not permit any of the values 0-100). 

1119 # 

1120 # When the choice sequence becomes misaligned, we generate a new value of the 

1121 # type and constraints the strategy expects. 

1122 if node_choice_type != choice_type or not choice_permitted(choice, constraints): 

1123 # only track first misalignment for now. 

1124 if self.misaligned_at is None: 

1125 self.misaligned_at = (self.index, choice_type, constraints, forced) 

1126 try: 

1127 # Fill in any misalignments with index 0 choices. An alternative to 

1128 # this is using the index of the misaligned choice instead 

1129 # of index 0, which may be useful for maintaining 

1130 # "similarly-complex choices" in the shrinker. This requires 

1131 # attaching an index to every choice in ConjectureData.for_choices, 

1132 # which we don't always have (e.g. when reading from db). 

1133 # 

1134 # If we really wanted this in the future we could make this complexity 

1135 # optional, use it if present, and default to index 0 otherwise. 

1136 # This complicates our internal api and so I'd like to avoid it 

1137 # if possible. 

1138 # 

1139 # Additionally, I don't think slips which require 

1140 # slipping to high-complexity values are common. Though arguably 

1141 # we may want to expand a bit beyond *just* the simplest choice. 

1142 # (we could for example consider sampling choices from index 0-10). 

1143 choice = choice_from_index(0, choice_type, constraints) 

1144 except ChoiceTooLarge: 

1145 # should really never happen with a 0-index choice, but let's be safe. 

1146 self.mark_overrun() 

1147 

1148 self.index += 1 

1149 return choice 

1150 

1151 def as_result(self) -> Union[ConjectureResult, _Overrun]: 

1152 """Convert the result of running this test into 

1153 either an Overrun object or a ConjectureResult.""" 

1154 

1155 assert self.frozen 

1156 if self.status == Status.OVERRUN: 

1157 return Overrun 

1158 if self.__result is None: 

1159 self.__result = ConjectureResult( 

1160 status=self.status, 

1161 interesting_origin=self.interesting_origin, 

1162 spans=self.spans, 

1163 nodes=self.nodes, 

1164 length=self.length, 

1165 output=self.output, 

1166 expected_traceback=self.expected_traceback, 

1167 expected_exception=self.expected_exception, 

1168 extra_information=( 

1169 self.extra_information 

1170 if self.extra_information.has_information() 

1171 else None 

1172 ), 

1173 has_discards=self.has_discards, 

1174 target_observations=self.target_observations, 

1175 tags=frozenset(self.tags), 

1176 arg_slices=self.arg_slices, 

1177 slice_comments=self.slice_comments, 

1178 misaligned_at=self.misaligned_at, 

1179 cannot_proceed_scope=self.cannot_proceed_scope, 

1180 ) 

1181 assert self.__result is not None 

1182 return self.__result 

1183 

1184 def __assert_not_frozen(self, name: str) -> None: 

1185 if self.frozen: 

1186 raise Frozen(f"Cannot call {name} on frozen ConjectureData") 

1187 

1188 def note(self, value: Any) -> None: 

1189 self.__assert_not_frozen("note") 

1190 if not isinstance(value, str): 

1191 value = repr(value) 

1192 self.output += value 

1193 

1194 def draw( 

1195 self, 

1196 strategy: "SearchStrategy[Ex]", 

1197 label: Optional[int] = None, 

1198 observe_as: Optional[str] = None, 

1199 ) -> "Ex": 

1200 from hypothesis.internal.observability import observability_enabled 

1201 from hypothesis.strategies._internal.lazy import unwrap_strategies 

1202 from hypothesis.strategies._internal.utils import to_jsonable 

1203 

1204 at_top_level = self.depth == 0 

1205 start_time = None 

1206 if at_top_level: 

1207 # We start this timer early, because accessing attributes on a LazyStrategy 

1208 # can be almost arbitrarily slow. In cases like characters() and text() 

1209 # where we cache something expensive, this led to Flaky deadline errors! 

1210 # See https://github.com/HypothesisWorks/hypothesis/issues/2108 

1211 start_time = time.perf_counter() 

1212 gc_start_time = gc_cumulative_time() 

1213 

1214 strategy.validate() 

1215 

1216 if strategy.is_empty: 

1217 self.mark_invalid(f"empty strategy {self!r}") 

1218 

1219 if self.depth >= MAX_DEPTH: 

1220 self.mark_invalid("max depth exceeded") 

1221 

1222 # Jump directly to the unwrapped strategy for the label and for do_draw. 

1223 # This avoids adding an extra span to all lazy strategies. 

1224 unwrapped = unwrap_strategies(strategy) 

1225 if label is None: 

1226 label = unwrapped.label 

1227 assert isinstance(label, int) 

1228 

1229 self.start_span(label=label) 

1230 try: 

1231 if not at_top_level: 

1232 return unwrapped.do_draw(self) 

1233 assert start_time is not None 

1234 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}" 

1235 try: 

1236 try: 

1237 v = unwrapped.do_draw(self) 

1238 finally: 

1239 # Subtract the time spent in GC to avoid overcounting, as it is 

1240 # accounted for at the overall example level. 

1241 in_gctime = gc_cumulative_time() - gc_start_time 

1242 self.draw_times[key] = time.perf_counter() - start_time - in_gctime 

1243 except Exception as err: 

1244 add_note( 

1245 err, 

1246 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}", 

1247 ) 

1248 raise 

1249 if observability_enabled(): 

1250 avoid = self.provider.avoid_realization 

1251 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid) 

1252 return v 

1253 finally: 

1254 self.stop_span() 

1255 

1256 def start_span(self, label: int) -> None: 

1257 self.provider.span_start(label) 

1258 self.__assert_not_frozen("start_span") 

1259 self.depth += 1 

1260 # Logically it would make sense for this to just be 

1261 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to 

1262 # be until we ran the code under tracemalloc and found a rather significant 

1263 # chunk of allocation was happening here. This was presumably due to varargs 

1264 # or the like, but we didn't investigate further given that it was easy 

1265 # to fix with this check. 

1266 if self.depth > self.max_depth: 

1267 self.max_depth = self.depth 

1268 self.__span_record.start_span(label) 

1269 self.labels_for_structure_stack.append({label}) 

1270 

1271 def stop_span(self, *, discard: bool = False) -> None: 

1272 self.provider.span_end(discard) 

1273 if self.frozen: 

1274 return 

1275 if discard: 

1276 self.has_discards = True 

1277 self.depth -= 1 

1278 assert self.depth >= -1 

1279 self.__span_record.stop_span(discard=discard) 

1280 

1281 labels_for_structure = self.labels_for_structure_stack.pop() 

1282 

1283 if not discard: 

1284 if self.labels_for_structure_stack: 

1285 self.labels_for_structure_stack[-1].update(labels_for_structure) 

1286 else: 

1287 self.tags.update([structural_coverage(l) for l in labels_for_structure]) 

1288 

1289 if discard: 

1290 # Once we've discarded a span, every test case starting with 

1291 # this prefix contains discards. We prune the tree at that point so 

1292 # as to avoid future test cases bothering with this region, on the 

1293 # assumption that some span that you could have used instead 

1294 # there would *not* trigger the discard. This greatly speeds up 

1295 # test case generation in some cases, because it allows us to 

1296 # ignore large swathes of the search space that are effectively 

1297 # redundant. 

1298 # 

1299 # A scenario that can cause us problems but which we deliberately 

1300 # have decided not to support is that if there are side effects 

1301 # during data generation then you may end up with a scenario where 

1302 # every good test case generates a discard because the discarded 

1303 # section sets up important things for later. This is not terribly 

1304 # likely and all that you see in this case is some degradation in 

1305 # quality of testing, so we don't worry about it. 

1306 # 

1307 # Note that killing the branch does *not* mean we will never 

1308 # explore below this point, and in particular we may do so during 

1309 # shrinking. Any explicit request for a data object that starts 

1310 # with the branch here will work just fine, but novel prefix 

1311 # generation will avoid it, and we can use it to detect when we 

1312 # have explored the entire tree (up to redundancy). 

1313 

1314 self.observer.kill_branch() 

1315 

1316 @property 

1317 def spans(self) -> Spans: 

1318 assert self.frozen 

1319 if self.__spans is None: 

1320 self.__spans = Spans(record=self.__span_record) 

1321 return self.__spans 

1322 

1323 def freeze(self) -> None: 

1324 if self.frozen: 

1325 return 

1326 self.finish_time = time.perf_counter() 

1327 self.gc_finish_time = gc_cumulative_time() 

1328 

1329 # Always finish by closing all remaining spans so that we have a valid tree. 

1330 while self.depth >= 0: 

1331 self.stop_span() 

1332 

1333 self.__span_record.freeze() 

1334 self.frozen = True 

1335 self.observer.conclude_test(self.status, self.interesting_origin) 

1336 

1337 def choice( 

1338 self, 

1339 values: Sequence[T], 

1340 *, 

1341 forced: Optional[T] = None, 

1342 observe: bool = True, 

1343 ) -> T: 

1344 forced_i = None if forced is None else values.index(forced) 

1345 i = self.draw_integer( 

1346 0, 

1347 len(values) - 1, 

1348 forced=forced_i, 

1349 observe=observe, 

1350 ) 

1351 return values[i] 

1352 

1353 def conclude_test( 

1354 self, 

1355 status: Status, 

1356 interesting_origin: Optional[InterestingOrigin] = None, 

1357 ) -> NoReturn: 

1358 assert (interesting_origin is None) or (status == Status.INTERESTING) 

1359 self.__assert_not_frozen("conclude_test") 

1360 self.interesting_origin = interesting_origin 

1361 self.status = status 

1362 self.freeze() 

1363 raise StopTest(self.testcounter) 

1364 

1365 def mark_interesting(self, interesting_origin: InterestingOrigin) -> NoReturn: 

1366 self.conclude_test(Status.INTERESTING, interesting_origin) 

1367 

1368 def mark_invalid(self, why: Optional[str] = None) -> NoReturn: 

1369 if why is not None: 

1370 self.events["invalid because"] = why 

1371 self.conclude_test(Status.INVALID) 

1372 

1373 def mark_overrun(self) -> NoReturn: 

1374 self.conclude_test(Status.OVERRUN) 

1375 

1376 

1377def draw_choice( 

1378 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT, *, random: Random 

1379) -> ChoiceT: 

1380 cd = ConjectureData(random=random) 

1381 return cast(ChoiceT, getattr(cd.provider, f"draw_{choice_type}")(**constraints))