Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/data.py: 63%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

623 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import math 

12import time 

13from collections import defaultdict 

14from collections.abc import Hashable, Iterable, Iterator, Sequence 

15from enum import IntEnum 

16from functools import cached_property 

17from random import Random 

18from typing import ( 

19 TYPE_CHECKING, 

20 Any, 

21 Literal, 

22 NoReturn, 

23 Optional, 

24 TypeVar, 

25 Union, 

26 cast, 

27 overload, 

28) 

29 

30import attr 

31 

32from hypothesis.errors import ( 

33 CannotProceedScopeT, 

34 ChoiceTooLarge, 

35 Frozen, 

36 InvalidArgument, 

37 StopTest, 

38) 

39from hypothesis.internal.cache import LRUCache 

40from hypothesis.internal.compat import add_note 

41from hypothesis.internal.conjecture.choice import ( 

42 BooleanConstraints, 

43 BytesConstraints, 

44 ChoiceConstraintsT, 

45 ChoiceNode, 

46 ChoiceT, 

47 ChoiceTemplate, 

48 ChoiceTypeT, 

49 FloatConstraints, 

50 IntegerConstraints, 

51 StringConstraints, 

52 choice_constraints_key, 

53 choice_from_index, 

54 choice_permitted, 

55 choices_size, 

56) 

57from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time 

58from hypothesis.internal.conjecture.providers import ( 

59 COLLECTION_DEFAULT_MAX_SIZE, 

60 HypothesisProvider, 

61 PrimitiveProvider, 

62) 

63from hypothesis.internal.conjecture.utils import calc_label_from_name 

64from hypothesis.internal.escalation import InterestingOrigin 

65from hypothesis.internal.floats import ( 

66 SMALLEST_SUBNORMAL, 

67 float_to_int, 

68 int_to_float, 

69 sign_aware_lte, 

70) 

71from hypothesis.internal.intervalsets import IntervalSet 

72from hypothesis.internal.observability import PredicateCounts 

73from hypothesis.reporting import debug_report 

74from hypothesis.utils.conventions import not_set 

75from hypothesis.utils.threading import ThreadLocal 

76 

77if TYPE_CHECKING: 

78 from typing import TypeAlias 

79 

80 from hypothesis.strategies import SearchStrategy 

81 from hypothesis.strategies._internal.strategies import Ex 

82 

83 

84def __getattr__(name: str) -> Any: 

85 if name == "AVAILABLE_PROVIDERS": 

86 from hypothesis._settings import note_deprecation 

87 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS 

88 

89 note_deprecation( 

90 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to " 

91 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.", 

92 since="2025-01-25", 

93 has_codemod=False, 

94 stacklevel=1, 

95 ) 

96 return AVAILABLE_PROVIDERS 

97 

98 raise AttributeError( 

99 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}" 

100 ) 

101 

102 

103T = TypeVar("T") 

104TargetObservations = dict[str, Union[int, float]] 

105# index, choice_type, constraints, forced value 

106MisalignedAt: "TypeAlias" = tuple[ 

107 int, ChoiceTypeT, ChoiceConstraintsT, Optional[ChoiceT] 

108] 

109 

110TOP_LABEL = calc_label_from_name("top") 

111MAX_DEPTH = 100 

112 

113threadlocal = ThreadLocal(global_test_counter=int) 

114 

115 

116class ExtraInformation: 

117 """A class for holding shared state on a ``ConjectureData`` that should 

118 be added to the final ``ConjectureResult``.""" 

119 

120 def __repr__(self) -> str: 

121 return "ExtraInformation({})".format( 

122 ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()), 

123 ) 

124 

125 def has_information(self) -> bool: 

126 return bool(self.__dict__) 

127 

128 

129class Status(IntEnum): 

130 OVERRUN = 0 

131 INVALID = 1 

132 VALID = 2 

133 INTERESTING = 3 

134 

135 def __repr__(self) -> str: 

136 return f"Status.{self.name}" 

137 

138 

139@attr.s(slots=True, frozen=True) 

140class StructuralCoverageTag: 

141 label: int = attr.ib() 

142 

143 

144STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {} 

145 

146 

147def structural_coverage(label: int) -> StructuralCoverageTag: 

148 try: 

149 return STRUCTURAL_COVERAGE_CACHE[label] 

150 except KeyError: 

151 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label)) 

152 

153 

154# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for 

155# performance. We lose scan resistance, but that's probably fine here. 

156POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096) 

157 

158 

159class Span: 

160 """A span tracks the hierarchical structure of choices within a single test run. 

161 

162 Spans are created to mark regions of the choice sequence that that are 

163 logically related to each other. For instance, Hypothesis tracks: 

164 - A single top-level span for the entire choice sequence 

165 - A span for the choices made by each strategy 

166 - Some strategies define additional spans within their choices. For instance, 

167 st.lists() tracks the "should add another element" choice and the "add 

168 another element" choices as separate spans. 

169 

170 Spans provide useful information to the shrinker, mutator, targeted PBT, 

171 and other subsystems of Hypothesis. 

172 

173 Rather than store each ``Span`` as a rich object, it is actually 

174 just an index into the ``Spans`` class defined below. This has two 

175 purposes: Firstly, for most properties of spans we will never need 

176 to allocate storage at all, because most properties are not used on 

177 most spans. Secondly, by storing the spans as compact lists 

178 of integers, we save a considerable amount of space compared to 

179 Python's normal object size. 

180 

181 This does have the downside that it increases the amount of allocation 

182 we do, and slows things down as a result, in some usage patterns because 

183 we repeatedly allocate the same Span or int objects, but it will 

184 often dramatically reduce our memory usage, so is worth it. 

185 """ 

186 

187 __slots__ = ("index", "owner") 

188 

189 def __init__(self, owner: "Spans", index: int) -> None: 

190 self.owner = owner 

191 self.index = index 

192 

193 def __eq__(self, other: object) -> bool: 

194 if self is other: 

195 return True 

196 if not isinstance(other, Span): 

197 return NotImplemented 

198 return (self.owner is other.owner) and (self.index == other.index) 

199 

200 def __ne__(self, other: object) -> bool: 

201 if self is other: 

202 return False 

203 if not isinstance(other, Span): 

204 return NotImplemented 

205 return (self.owner is not other.owner) or (self.index != other.index) 

206 

207 def __repr__(self) -> str: 

208 return f"spans[{self.index}]" 

209 

210 @property 

211 def label(self) -> int: 

212 """A label is an opaque value that associates each span with its 

213 approximate origin, such as a particular strategy class or a particular 

214 kind of draw.""" 

215 return self.owner.labels[self.owner.label_indices[self.index]] 

216 

217 @property 

218 def parent(self) -> Optional[int]: 

219 """The index of the span that this one is nested directly within.""" 

220 if self.index == 0: 

221 return None 

222 return self.owner.parentage[self.index] 

223 

224 @property 

225 def start(self) -> int: 

226 return self.owner.starts[self.index] 

227 

228 @property 

229 def end(self) -> int: 

230 return self.owner.ends[self.index] 

231 

232 @property 

233 def depth(self) -> int: 

234 """ 

235 Depth of this span in the span tree. The top-level span has a depth of 0. 

236 """ 

237 return self.owner.depths[self.index] 

238 

239 @property 

240 def discarded(self) -> bool: 

241 """True if this is span's ``stop_span`` call had ``discard`` set to 

242 ``True``. This means we believe that the shrinker should be able to delete 

243 this span completely, without affecting the value produced by its enclosing 

244 strategy. Typically set when a rejection sampler decides to reject a 

245 generated value and try again.""" 

246 return self.index in self.owner.discarded 

247 

248 @property 

249 def choice_count(self) -> int: 

250 """The number of choices in this span.""" 

251 return self.end - self.start 

252 

253 @property 

254 def children(self) -> "list[Span]": 

255 """The list of all spans with this as a parent, in increasing index 

256 order.""" 

257 return [self.owner[i] for i in self.owner.children[self.index]] 

258 

259 

260class SpanProperty: 

261 """There are many properties of spans that we calculate by 

262 essentially rerunning the test case multiple times based on the 

263 calls which we record in SpanProperty. 

264 

265 This class defines a visitor, subclasses of which can be used 

266 to calculate these properties. 

267 """ 

268 

269 def __init__(self, spans: "Spans"): 

270 self.span_stack: list[int] = [] 

271 self.spans = spans 

272 self.span_count = 0 

273 self.choice_count = 0 

274 

275 def run(self) -> Any: 

276 """Rerun the test case with this visitor and return the 

277 results of ``self.finish()``.""" 

278 for record in self.spans.trail: 

279 if record == TrailType.STOP_SPAN_DISCARD: 

280 self.__pop(discarded=True) 

281 elif record == TrailType.STOP_SPAN_NO_DISCARD: 

282 self.__pop(discarded=False) 

283 elif record == TrailType.CHOICE: 

284 self.choice_count += 1 

285 else: 

286 # everything after TrailType.CHOICE is the label of a span start. 

287 self.__push(record - TrailType.CHOICE - 1) 

288 

289 return self.finish() 

290 

291 def __push(self, label_index: int) -> None: 

292 i = self.span_count 

293 assert i < len(self.spans) 

294 self.start_span(i, label_index=label_index) 

295 self.span_count += 1 

296 self.span_stack.append(i) 

297 

298 def __pop(self, *, discarded: bool) -> None: 

299 i = self.span_stack.pop() 

300 self.stop_span(i, discarded=discarded) 

301 

302 def start_span(self, i: int, label_index: int) -> None: 

303 """Called at the start of each span, with ``i`` the 

304 index of the span and ``label_index`` the index of 

305 its label in ``self.spans.labels``.""" 

306 

307 def stop_span(self, i: int, *, discarded: bool) -> None: 

308 """Called at the end of each span, with ``i`` the 

309 index of the span and ``discarded`` being ``True`` if ``stop_span`` 

310 was called with ``discard=True``.""" 

311 

312 def finish(self) -> Any: 

313 raise NotImplementedError 

314 

315 

316class TrailType(IntEnum): 

317 STOP_SPAN_DISCARD = 1 

318 STOP_SPAN_NO_DISCARD = 2 

319 CHOICE = 3 

320 # every trail element larger than TrailType.CHOICE is the label of a span 

321 # start, offset by its index. So the first span label is stored as 4, the 

322 # second as 5, etc, regardless of its actual integer label. 

323 

324 

325class SpanRecord: 

326 """Records the series of ``start_span``, ``stop_span``, and 

327 ``draw_bits`` calls so that these may be stored in ``Spans`` and 

328 replayed when we need to know about the structure of individual 

329 ``Span`` objects. 

330 

331 Note that there is significant similarity between this class and 

332 ``DataObserver``, and the plan is to eventually unify them, but 

333 they currently have slightly different functions and implementations. 

334 """ 

335 

336 def __init__(self) -> None: 

337 self.labels: list[int] = [] 

338 self.__index_of_labels: Optional[dict[int, int]] = {} 

339 self.trail = IntList() 

340 self.nodes: list[ChoiceNode] = [] 

341 

342 def freeze(self) -> None: 

343 self.__index_of_labels = None 

344 

345 def record_choice(self) -> None: 

346 self.trail.append(TrailType.CHOICE) 

347 

348 def start_span(self, label: int) -> None: 

349 assert self.__index_of_labels is not None 

350 try: 

351 i = self.__index_of_labels[label] 

352 except KeyError: 

353 i = self.__index_of_labels.setdefault(label, len(self.labels)) 

354 self.labels.append(label) 

355 self.trail.append(TrailType.CHOICE + 1 + i) 

356 

357 def stop_span(self, *, discard: bool) -> None: 

358 if discard: 

359 self.trail.append(TrailType.STOP_SPAN_DISCARD) 

360 else: 

361 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD) 

362 

363 

364class _starts_and_ends(SpanProperty): 

365 def __init__(self, spans: "Spans") -> None: 

366 super().__init__(spans) 

367 self.starts = IntList.of_length(len(self.spans)) 

368 self.ends = IntList.of_length(len(self.spans)) 

369 

370 def start_span(self, i: int, label_index: int) -> None: 

371 self.starts[i] = self.choice_count 

372 

373 def stop_span(self, i: int, *, discarded: bool) -> None: 

374 self.ends[i] = self.choice_count 

375 

376 def finish(self) -> tuple[IntList, IntList]: 

377 return (self.starts, self.ends) 

378 

379 

380class _discarded(SpanProperty): 

381 def __init__(self, spans: "Spans") -> None: 

382 super().__init__(spans) 

383 self.result: set[int] = set() 

384 

385 def finish(self) -> frozenset[int]: 

386 return frozenset(self.result) 

387 

388 def stop_span(self, i: int, *, discarded: bool) -> None: 

389 if discarded: 

390 self.result.add(i) 

391 

392 

393class _parentage(SpanProperty): 

394 def __init__(self, spans: "Spans") -> None: 

395 super().__init__(spans) 

396 self.result = IntList.of_length(len(self.spans)) 

397 

398 def stop_span(self, i: int, *, discarded: bool) -> None: 

399 if i > 0: 

400 self.result[i] = self.span_stack[-1] 

401 

402 def finish(self) -> IntList: 

403 return self.result 

404 

405 

406class _depths(SpanProperty): 

407 def __init__(self, spans: "Spans") -> None: 

408 super().__init__(spans) 

409 self.result = IntList.of_length(len(self.spans)) 

410 

411 def start_span(self, i: int, label_index: int) -> None: 

412 self.result[i] = len(self.span_stack) 

413 

414 def finish(self) -> IntList: 

415 return self.result 

416 

417 

418class _label_indices(SpanProperty): 

419 def __init__(self, spans: "Spans") -> None: 

420 super().__init__(spans) 

421 self.result = IntList.of_length(len(self.spans)) 

422 

423 def start_span(self, i: int, label_index: int) -> None: 

424 self.result[i] = label_index 

425 

426 def finish(self) -> IntList: 

427 return self.result 

428 

429 

430class _mutator_groups(SpanProperty): 

431 def __init__(self, spans: "Spans") -> None: 

432 super().__init__(spans) 

433 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set) 

434 

435 def start_span(self, i: int, label_index: int) -> None: 

436 # TODO should we discard start == end cases? occurs for eg st.data() 

437 # which is conditionally or never drawn from. arguably swapping 

438 # nodes with the empty list is a useful mutation enabled by start == end? 

439 key = (self.spans[i].start, self.spans[i].end) 

440 self.groups[label_index].add(key) 

441 

442 def finish(self) -> Iterable[set[tuple[int, int]]]: 

443 # Discard groups with only one span, since the mutator can't 

444 # do anything useful with them. 

445 return [g for g in self.groups.values() if len(g) >= 2] 

446 

447 

448class Spans: 

449 """A lazy collection of ``Span`` objects, derived from 

450 the record of recorded behaviour in ``SpanRecord``. 

451 

452 Behaves logically as if it were a list of ``Span`` objects, 

453 but actually mostly exists as a compact store of information 

454 for them to reference into. All properties on here are best 

455 understood as the backing storage for ``Span`` and are 

456 described there. 

457 """ 

458 

459 def __init__(self, record: SpanRecord) -> None: 

460 self.trail = record.trail 

461 self.labels = record.labels 

462 self.__length = self.trail.count( 

463 TrailType.STOP_SPAN_DISCARD 

464 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD) 

465 self.__children: Optional[list[Sequence[int]]] = None 

466 

467 @cached_property 

468 def starts_and_ends(self) -> tuple[IntList, IntList]: 

469 return _starts_and_ends(self).run() 

470 

471 @property 

472 def starts(self) -> IntList: 

473 return self.starts_and_ends[0] 

474 

475 @property 

476 def ends(self) -> IntList: 

477 return self.starts_and_ends[1] 

478 

479 @cached_property 

480 def discarded(self) -> frozenset[int]: 

481 return _discarded(self).run() 

482 

483 @cached_property 

484 def parentage(self) -> IntList: 

485 return _parentage(self).run() 

486 

487 @cached_property 

488 def depths(self) -> IntList: 

489 return _depths(self).run() 

490 

491 @cached_property 

492 def label_indices(self) -> IntList: 

493 return _label_indices(self).run() 

494 

495 @cached_property 

496 def mutator_groups(self) -> list[set[tuple[int, int]]]: 

497 return _mutator_groups(self).run() 

498 

499 @property 

500 def children(self) -> list[Sequence[int]]: 

501 if self.__children is None: 

502 children = [IntList() for _ in range(len(self))] 

503 for i, p in enumerate(self.parentage): 

504 if i > 0: 

505 children[p].append(i) 

506 # Replace empty children lists with a tuple to reduce 

507 # memory usage. 

508 for i, c in enumerate(children): 

509 if not c: 

510 children[i] = () # type: ignore 

511 self.__children = children # type: ignore 

512 return self.__children # type: ignore 

513 

514 def __len__(self) -> int: 

515 return self.__length 

516 

517 def __getitem__(self, i: int) -> Span: 

518 n = self.__length 

519 if i < -n or i >= n: 

520 raise IndexError(f"Index {i} out of range [-{n}, {n})") 

521 if i < 0: 

522 i += n 

523 return Span(self, i) 

524 

525 # not strictly necessary as we have len/getitem, but required for mypy. 

526 # https://github.com/python/mypy/issues/9737 

527 def __iter__(self) -> Iterator[Span]: 

528 for i in range(len(self)): 

529 yield self[i] 

530 

531 

532class _Overrun: 

533 status: Status = Status.OVERRUN 

534 

535 def __repr__(self) -> str: 

536 return "Overrun" 

537 

538 

539Overrun = _Overrun() 

540 

541 

542class DataObserver: 

543 """Observer class for recording the behaviour of a 

544 ConjectureData object, primarily used for tracking 

545 the behaviour in the tree cache.""" 

546 

547 def conclude_test( 

548 self, 

549 status: Status, 

550 interesting_origin: Optional[InterestingOrigin], 

551 ) -> None: 

552 """Called when ``conclude_test`` is called on the 

553 observed ``ConjectureData``, with the same arguments. 

554 

555 Note that this is called after ``freeze`` has completed. 

556 """ 

557 

558 def kill_branch(self) -> None: 

559 """Mark this part of the tree as not worth re-exploring.""" 

560 

561 def draw_integer( 

562 self, value: int, *, constraints: IntegerConstraints, was_forced: bool 

563 ) -> None: 

564 pass 

565 

566 def draw_float( 

567 self, value: float, *, constraints: FloatConstraints, was_forced: bool 

568 ) -> None: 

569 pass 

570 

571 def draw_string( 

572 self, value: str, *, constraints: StringConstraints, was_forced: bool 

573 ) -> None: 

574 pass 

575 

576 def draw_bytes( 

577 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool 

578 ) -> None: 

579 pass 

580 

581 def draw_boolean( 

582 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool 

583 ) -> None: 

584 pass 

585 

586 

587@attr.s(slots=True) 

588class ConjectureResult: 

589 """Result class storing the parts of ConjectureData that we 

590 will care about after the original ConjectureData has outlived its 

591 usefulness.""" 

592 

593 status: Status = attr.ib() 

594 interesting_origin: Optional[InterestingOrigin] = attr.ib() 

595 nodes: tuple[ChoiceNode, ...] = attr.ib(eq=False, repr=False) 

596 length: int = attr.ib() 

597 output: str = attr.ib() 

598 extra_information: Optional[ExtraInformation] = attr.ib() 

599 expected_exception: Optional[BaseException] = attr.ib() 

600 expected_traceback: Optional[str] = attr.ib() 

601 has_discards: bool = attr.ib() 

602 target_observations: TargetObservations = attr.ib() 

603 tags: frozenset[StructuralCoverageTag] = attr.ib() 

604 spans: Spans = attr.ib(repr=False, eq=False) 

605 arg_slices: set[tuple[int, int]] = attr.ib(repr=False) 

606 slice_comments: dict[tuple[int, int], str] = attr.ib(repr=False) 

607 misaligned_at: Optional[MisalignedAt] = attr.ib(repr=False) 

608 cannot_proceed_scope: Optional[CannotProceedScopeT] = attr.ib(repr=False) 

609 

610 def as_result(self) -> "ConjectureResult": 

611 return self 

612 

613 @property 

614 def choices(self) -> tuple[ChoiceT, ...]: 

615 return tuple(node.value for node in self.nodes) 

616 

617 

618class ConjectureData: 

619 @classmethod 

620 def for_choices( 

621 cls, 

622 choices: Sequence[Union[ChoiceTemplate, ChoiceT]], 

623 *, 

624 observer: Optional[DataObserver] = None, 

625 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

626 random: Optional[Random] = None, 

627 ) -> "ConjectureData": 

628 from hypothesis.internal.conjecture.engine import choice_count 

629 

630 return cls( 

631 max_choices=choice_count(choices), 

632 random=random, 

633 prefix=choices, 

634 observer=observer, 

635 provider=provider, 

636 ) 

637 

638 def __init__( 

639 self, 

640 *, 

641 random: Optional[Random], 

642 observer: Optional[DataObserver] = None, 

643 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

644 prefix: Optional[Sequence[Union[ChoiceTemplate, ChoiceT]]] = None, 

645 max_choices: Optional[int] = None, 

646 provider_kw: Optional[dict[str, Any]] = None, 

647 ) -> None: 

648 from hypothesis.internal.conjecture.engine import BUFFER_SIZE 

649 

650 if observer is None: 

651 observer = DataObserver() 

652 if provider_kw is None: 

653 provider_kw = {} 

654 elif not isinstance(provider, type): 

655 raise InvalidArgument( 

656 f"Expected {provider=} to be a class since {provider_kw=} was " 

657 "passed, but got an instance instead." 

658 ) 

659 

660 assert isinstance(observer, DataObserver) 

661 self.observer = observer 

662 self.max_choices = max_choices 

663 self.max_length = BUFFER_SIZE 

664 self.is_find = False 

665 self.overdraw = 0 

666 self._random = random 

667 

668 self.length = 0 

669 self.index = 0 

670 self.output = "" 

671 self.status = Status.VALID 

672 self.frozen = False 

673 self.testcounter = threadlocal.global_test_counter 

674 threadlocal.global_test_counter += 1 

675 self.start_time = time.perf_counter() 

676 self.gc_start_time = gc_cumulative_time() 

677 self.events: dict[str, Union[str, int, float]] = {} 

678 self.interesting_origin: Optional[InterestingOrigin] = None 

679 self.draw_times: dict[str, float] = {} 

680 self._stateful_run_times: dict[str, float] = defaultdict(float) 

681 self.max_depth = 0 

682 self.has_discards = False 

683 

684 self.provider: PrimitiveProvider = ( 

685 provider(self, **provider_kw) if isinstance(provider, type) else provider 

686 ) 

687 assert isinstance(self.provider, PrimitiveProvider) 

688 

689 self.__result: Optional[ConjectureResult] = None 

690 

691 # Observations used for targeted search. They'll be aggregated in 

692 # ConjectureRunner.generate_new_examples and fed to TargetSelector. 

693 self.target_observations: TargetObservations = {} 

694 

695 # Tags which indicate something about which part of the search space 

696 # this example is in. These are used to guide generation. 

697 self.tags: set[StructuralCoverageTag] = set() 

698 self.labels_for_structure_stack: list[set[int]] = [] 

699 

700 # Normally unpopulated but we need this in the niche case 

701 # that self.as_result() is Overrun but we still want the 

702 # examples for reporting purposes. 

703 self.__spans: Optional[Spans] = None 

704 

705 # We want the top level span to have depth 0, so we start 

706 # at -1. 

707 self.depth = -1 

708 self.__span_record = SpanRecord() 

709 

710 # Slice indices for discrete reportable parts that which-parts-matter can 

711 # try varying, to report if the minimal example always fails anyway. 

712 self.arg_slices: set[tuple[int, int]] = set() 

713 self.slice_comments: dict[tuple[int, int], str] = {} 

714 self._observability_args: dict[str, Any] = {} 

715 self._observability_predicates: defaultdict[str, PredicateCounts] = defaultdict( 

716 PredicateCounts 

717 ) 

718 self._sampled_from_all_strategies_elements_message: Optional[ 

719 tuple[str, object] 

720 ] = None 

721 self._shared_strategy_draws: dict[Hashable, tuple[int, Any]] = {} 

722 self.hypothesis_runner = not_set 

723 

724 self.expected_exception: Optional[BaseException] = None 

725 self.expected_traceback: Optional[str] = None 

726 self.extra_information = ExtraInformation() 

727 

728 self.prefix = prefix 

729 self.nodes: tuple[ChoiceNode, ...] = () 

730 self.misaligned_at: Optional[MisalignedAt] = None 

731 self.cannot_proceed_scope: Optional[CannotProceedScopeT] = None 

732 self.start_span(TOP_LABEL) 

733 

734 def __repr__(self) -> str: 

735 return "ConjectureData(%s, %d choices%s)" % ( 

736 self.status.name, 

737 len(self.nodes), 

738 ", frozen" if self.frozen else "", 

739 ) 

740 

741 @property 

742 def choices(self) -> tuple[ChoiceT, ...]: 

743 return tuple(node.value for node in self.nodes) 

744 

745 # draw_* functions might be called in one of two contexts: either "above" or 

746 # "below" the choice sequence. For instance, draw_string calls draw_boolean 

747 # from ``many`` when calculating the number of characters to return. We do 

748 # not want these choices to get written to the choice sequence, because they 

749 # are not true choices themselves. 

750 # 

751 # `observe` formalizes this. The choice will only be written to the choice 

752 # sequence if observe is True. 

753 

754 @overload 

755 def _draw( 

756 self, 

757 choice_type: Literal["integer"], 

758 constraints: IntegerConstraints, 

759 *, 

760 observe: bool, 

761 forced: Optional[int], 

762 ) -> int: ... 

763 

764 @overload 

765 def _draw( 

766 self, 

767 choice_type: Literal["float"], 

768 constraints: FloatConstraints, 

769 *, 

770 observe: bool, 

771 forced: Optional[float], 

772 ) -> float: ... 

773 

774 @overload 

775 def _draw( 

776 self, 

777 choice_type: Literal["string"], 

778 constraints: StringConstraints, 

779 *, 

780 observe: bool, 

781 forced: Optional[str], 

782 ) -> str: ... 

783 

784 @overload 

785 def _draw( 

786 self, 

787 choice_type: Literal["bytes"], 

788 constraints: BytesConstraints, 

789 *, 

790 observe: bool, 

791 forced: Optional[bytes], 

792 ) -> bytes: ... 

793 

794 @overload 

795 def _draw( 

796 self, 

797 choice_type: Literal["boolean"], 

798 constraints: BooleanConstraints, 

799 *, 

800 observe: bool, 

801 forced: Optional[bool], 

802 ) -> bool: ... 

803 

804 def _draw( 

805 self, 

806 choice_type: ChoiceTypeT, 

807 constraints: ChoiceConstraintsT, 

808 *, 

809 observe: bool, 

810 forced: Optional[ChoiceT], 

811 ) -> ChoiceT: 

812 # this is somewhat redundant with the length > max_length check at the 

813 # end of the function, but avoids trying to use a null self.random when 

814 # drawing past the node of a ConjectureData.for_choices data. 

815 if self.length == self.max_length: 

816 debug_report(f"overrun because hit {self.max_length=}") 

817 self.mark_overrun() 

818 if len(self.nodes) == self.max_choices: 

819 debug_report(f"overrun because hit {self.max_choices=}") 

820 self.mark_overrun() 

821 

822 if observe and self.prefix is not None and self.index < len(self.prefix): 

823 value = self._pop_choice(choice_type, constraints, forced=forced) 

824 elif forced is None: 

825 value = getattr(self.provider, f"draw_{choice_type}")(**constraints) 

826 

827 if forced is not None: 

828 value = forced 

829 

830 # nan values generated via int_to_float break list membership: 

831 # 

832 # >>> n = 18444492273895866368 

833 # >>> assert math.isnan(int_to_float(n)) 

834 # >>> assert int_to_float(n) not in [int_to_float(n)] 

835 # 

836 # because int_to_float nans are not equal in the sense of either 

837 # `a == b` or `a is b`. 

838 # 

839 # This can lead to flaky errors when collections require unique 

840 # floats. What was happening is that in some places we provided math.nan 

841 # provide math.nan, and in others we provided 

842 # int_to_float(float_to_int(math.nan)), and which one gets used 

843 # was not deterministic across test iterations. 

844 # 

845 # To fix this, *never* provide a nan value which is equal (via `is`) to 

846 # another provided nan value. This sacrifices some test power; we should 

847 # bring that back (ABOVE the choice sequence layer) in the future. 

848 # 

849 # See https://github.com/HypothesisWorks/hypothesis/issues/3926. 

850 if choice_type == "float": 

851 assert isinstance(value, float) 

852 if math.isnan(value): 

853 value = int_to_float(float_to_int(value)) 

854 

855 if observe: 

856 was_forced = forced is not None 

857 getattr(self.observer, f"draw_{choice_type}")( 

858 value, constraints=constraints, was_forced=was_forced 

859 ) 

860 size = 0 if self.provider.avoid_realization else choices_size([value]) 

861 if self.length + size > self.max_length: 

862 debug_report( 

863 f"overrun because {self.length=} + {size=} > {self.max_length=}" 

864 ) 

865 self.mark_overrun() 

866 

867 node = ChoiceNode( 

868 type=choice_type, 

869 value=value, 

870 constraints=constraints, 

871 was_forced=was_forced, 

872 index=len(self.nodes), 

873 ) 

874 self.__span_record.record_choice() 

875 self.nodes += (node,) 

876 self.length += size 

877 

878 return value 

879 

880 def draw_integer( 

881 self, 

882 min_value: Optional[int] = None, 

883 max_value: Optional[int] = None, 

884 *, 

885 weights: Optional[dict[int, float]] = None, 

886 shrink_towards: int = 0, 

887 forced: Optional[int] = None, 

888 observe: bool = True, 

889 ) -> int: 

890 # Validate arguments 

891 if weights is not None: 

892 assert min_value is not None 

893 assert max_value is not None 

894 assert len(weights) <= 255 # arbitrary practical limit 

895 # We can and should eventually support total weights. But this 

896 # complicates shrinking as we can no longer assume we can force 

897 # a value to the unmapped probability mass if that mass might be 0. 

898 assert sum(weights.values()) < 1 

899 # similarly, things get simpler if we assume every value is possible. 

900 # we'll want to drop this restriction eventually. 

901 assert all(w != 0 for w in weights.values()) 

902 

903 if forced is not None and min_value is not None: 

904 assert min_value <= forced 

905 if forced is not None and max_value is not None: 

906 assert forced <= max_value 

907 

908 constraints: IntegerConstraints = self._pooled_constraints( 

909 "integer", 

910 { 

911 "min_value": min_value, 

912 "max_value": max_value, 

913 "weights": weights, 

914 "shrink_towards": shrink_towards, 

915 }, 

916 ) 

917 return self._draw("integer", constraints, observe=observe, forced=forced) 

918 

919 def draw_float( 

920 self, 

921 min_value: float = -math.inf, 

922 max_value: float = math.inf, 

923 *, 

924 allow_nan: bool = True, 

925 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL, 

926 # TODO: consider supporting these float widths at the choice sequence 

927 # level in the future. 

928 # width: Literal[16, 32, 64] = 64, 

929 forced: Optional[float] = None, 

930 observe: bool = True, 

931 ) -> float: 

932 assert smallest_nonzero_magnitude > 0 

933 assert not math.isnan(min_value) 

934 assert not math.isnan(max_value) 

935 

936 if smallest_nonzero_magnitude == 0.0: # pragma: no cover 

937 raise FloatingPointError( 

938 "Got allow_subnormal=True, but we can't represent subnormal floats " 

939 "right now, in violation of the IEEE-754 floating-point " 

940 "specification. This is usually because something was compiled with " 

941 "-ffast-math or a similar option, which sets global processor state. " 

942 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed " 

943 "writeup - and good luck!" 

944 ) 

945 

946 if forced is not None: 

947 assert allow_nan or not math.isnan(forced) 

948 assert math.isnan(forced) or ( 

949 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value) 

950 ) 

951 

952 constraints: FloatConstraints = self._pooled_constraints( 

953 "float", 

954 { 

955 "min_value": min_value, 

956 "max_value": max_value, 

957 "allow_nan": allow_nan, 

958 "smallest_nonzero_magnitude": smallest_nonzero_magnitude, 

959 }, 

960 ) 

961 return self._draw("float", constraints, observe=observe, forced=forced) 

962 

963 def draw_string( 

964 self, 

965 intervals: IntervalSet, 

966 *, 

967 min_size: int = 0, 

968 max_size: int = COLLECTION_DEFAULT_MAX_SIZE, 

969 forced: Optional[str] = None, 

970 observe: bool = True, 

971 ) -> str: 

972 assert forced is None or min_size <= len(forced) <= max_size 

973 assert min_size >= 0 

974 if len(intervals) == 0: 

975 assert min_size == 0 

976 

977 constraints: StringConstraints = self._pooled_constraints( 

978 "string", 

979 { 

980 "intervals": intervals, 

981 "min_size": min_size, 

982 "max_size": max_size, 

983 }, 

984 ) 

985 return self._draw("string", constraints, observe=observe, forced=forced) 

986 

987 def draw_bytes( 

988 self, 

989 min_size: int = 0, 

990 max_size: int = COLLECTION_DEFAULT_MAX_SIZE, 

991 *, 

992 forced: Optional[bytes] = None, 

993 observe: bool = True, 

994 ) -> bytes: 

995 assert forced is None or min_size <= len(forced) <= max_size 

996 assert min_size >= 0 

997 

998 constraints: BytesConstraints = self._pooled_constraints( 

999 "bytes", {"min_size": min_size, "max_size": max_size} 

1000 ) 

1001 return self._draw("bytes", constraints, observe=observe, forced=forced) 

1002 

1003 def draw_boolean( 

1004 self, 

1005 p: float = 0.5, 

1006 *, 

1007 forced: Optional[bool] = None, 

1008 observe: bool = True, 

1009 ) -> bool: 

1010 assert (forced is not True) or p > 0 

1011 assert (forced is not False) or p < 1 

1012 

1013 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p}) 

1014 return self._draw("boolean", constraints, observe=observe, forced=forced) 

1015 

1016 @overload 

1017 def _pooled_constraints( 

1018 self, choice_type: Literal["integer"], constraints: IntegerConstraints 

1019 ) -> IntegerConstraints: ... 

1020 

1021 @overload 

1022 def _pooled_constraints( 

1023 self, choice_type: Literal["float"], constraints: FloatConstraints 

1024 ) -> FloatConstraints: ... 

1025 

1026 @overload 

1027 def _pooled_constraints( 

1028 self, choice_type: Literal["string"], constraints: StringConstraints 

1029 ) -> StringConstraints: ... 

1030 

1031 @overload 

1032 def _pooled_constraints( 

1033 self, choice_type: Literal["bytes"], constraints: BytesConstraints 

1034 ) -> BytesConstraints: ... 

1035 

1036 @overload 

1037 def _pooled_constraints( 

1038 self, choice_type: Literal["boolean"], constraints: BooleanConstraints 

1039 ) -> BooleanConstraints: ... 

1040 

1041 def _pooled_constraints( 

1042 self, choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT 

1043 ) -> ChoiceConstraintsT: 

1044 """Memoize common dictionary objects to reduce memory pressure.""" 

1045 # caching runs afoul of nondeterminism checks 

1046 if self.provider.avoid_realization: 

1047 return constraints 

1048 

1049 key = (choice_type, *choice_constraints_key(choice_type, constraints)) 

1050 try: 

1051 return POOLED_CONSTRAINTS_CACHE[key] 

1052 except KeyError: 

1053 POOLED_CONSTRAINTS_CACHE[key] = constraints 

1054 return constraints 

1055 

1056 def _pop_choice( 

1057 self, 

1058 choice_type: ChoiceTypeT, 

1059 constraints: ChoiceConstraintsT, 

1060 *, 

1061 forced: Optional[ChoiceT], 

1062 ) -> ChoiceT: 

1063 assert self.prefix is not None 

1064 # checked in _draw 

1065 assert self.index < len(self.prefix) 

1066 

1067 value = self.prefix[self.index] 

1068 if isinstance(value, ChoiceTemplate): 

1069 node: ChoiceTemplate = value 

1070 if node.count is not None: 

1071 assert node.count >= 0 

1072 # node templates have to be at the end for now, since it's not immediately 

1073 # apparent how to handle overruning a node template while generating a single 

1074 # node if the alternative is not "the entire data is an overrun". 

1075 assert self.index == len(self.prefix) - 1 

1076 if node.type == "simplest": 

1077 if forced is not None: 

1078 choice = forced 

1079 try: 

1080 choice = choice_from_index(0, choice_type, constraints) 

1081 except ChoiceTooLarge: 

1082 self.mark_overrun() 

1083 else: 

1084 raise NotImplementedError 

1085 

1086 if node.count is not None: 

1087 node.count -= 1 

1088 if node.count < 0: 

1089 self.mark_overrun() 

1090 return choice 

1091 

1092 choice = value 

1093 node_choice_type = { 

1094 str: "string", 

1095 float: "float", 

1096 int: "integer", 

1097 bool: "boolean", 

1098 bytes: "bytes", 

1099 }[type(choice)] 

1100 # If we're trying to: 

1101 # * draw a different choice type at the same location 

1102 # * draw the same choice type with a different constraints, which does not permit 

1103 # the current value 

1104 # 

1105 # then we call this a misalignment, because the choice sequence has 

1106 # changed from what we expected at some point. An easy misalignment is 

1107 # 

1108 # one_of(integers(0, 100), integers(101, 200)) 

1109 # 

1110 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100} 

1111 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at 

1112 # index 1 (which does not permit any of the values 0-100). 

1113 # 

1114 # When the choice sequence becomes misaligned, we generate a new value of the 

1115 # type and constraints the strategy expects. 

1116 if node_choice_type != choice_type or not choice_permitted(choice, constraints): 

1117 # only track first misalignment for now. 

1118 if self.misaligned_at is None: 

1119 self.misaligned_at = (self.index, choice_type, constraints, forced) 

1120 try: 

1121 # Fill in any misalignments with index 0 choices. An alternative to 

1122 # this is using the index of the misaligned choice instead 

1123 # of index 0, which may be useful for maintaining 

1124 # "similarly-complex choices" in the shrinker. This requires 

1125 # attaching an index to every choice in ConjectureData.for_choices, 

1126 # which we don't always have (e.g. when reading from db). 

1127 # 

1128 # If we really wanted this in the future we could make this complexity 

1129 # optional, use it if present, and default to index 0 otherwise. 

1130 # This complicates our internal api and so I'd like to avoid it 

1131 # if possible. 

1132 # 

1133 # Additionally, I don't think slips which require 

1134 # slipping to high-complexity values are common. Though arguably 

1135 # we may want to expand a bit beyond *just* the simplest choice. 

1136 # (we could for example consider sampling choices from index 0-10). 

1137 choice = choice_from_index(0, choice_type, constraints) 

1138 except ChoiceTooLarge: 

1139 # should really never happen with a 0-index choice, but let's be safe. 

1140 self.mark_overrun() 

1141 

1142 self.index += 1 

1143 return choice 

1144 

1145 def as_result(self) -> Union[ConjectureResult, _Overrun]: 

1146 """Convert the result of running this test into 

1147 either an Overrun object or a ConjectureResult.""" 

1148 

1149 assert self.frozen 

1150 if self.status == Status.OVERRUN: 

1151 return Overrun 

1152 if self.__result is None: 

1153 self.__result = ConjectureResult( 

1154 status=self.status, 

1155 interesting_origin=self.interesting_origin, 

1156 spans=self.spans, 

1157 nodes=self.nodes, 

1158 length=self.length, 

1159 output=self.output, 

1160 expected_traceback=self.expected_traceback, 

1161 expected_exception=self.expected_exception, 

1162 extra_information=( 

1163 self.extra_information 

1164 if self.extra_information.has_information() 

1165 else None 

1166 ), 

1167 has_discards=self.has_discards, 

1168 target_observations=self.target_observations, 

1169 tags=frozenset(self.tags), 

1170 arg_slices=self.arg_slices, 

1171 slice_comments=self.slice_comments, 

1172 misaligned_at=self.misaligned_at, 

1173 cannot_proceed_scope=self.cannot_proceed_scope, 

1174 ) 

1175 assert self.__result is not None 

1176 return self.__result 

1177 

1178 def __assert_not_frozen(self, name: str) -> None: 

1179 if self.frozen: 

1180 raise Frozen(f"Cannot call {name} on frozen ConjectureData") 

1181 

1182 def note(self, value: Any) -> None: 

1183 self.__assert_not_frozen("note") 

1184 if not isinstance(value, str): 

1185 value = repr(value) 

1186 self.output += value 

1187 

1188 def draw( 

1189 self, 

1190 strategy: "SearchStrategy[Ex]", 

1191 label: Optional[int] = None, 

1192 observe_as: Optional[str] = None, 

1193 ) -> "Ex": 

1194 from hypothesis.internal.observability import TESTCASE_CALLBACKS 

1195 from hypothesis.strategies._internal.lazy import unwrap_strategies 

1196 from hypothesis.strategies._internal.utils import to_jsonable 

1197 

1198 if self.is_find and not strategy.supports_find: 

1199 raise InvalidArgument( 

1200 f"Cannot use strategy {strategy!r} within a call to find " 

1201 "(presumably because it would be invalid after the call had ended)." 

1202 ) 

1203 

1204 at_top_level = self.depth == 0 

1205 start_time = None 

1206 if at_top_level: 

1207 # We start this timer early, because accessing attributes on a LazyStrategy 

1208 # can be almost arbitrarily slow. In cases like characters() and text() 

1209 # where we cache something expensive, this led to Flaky deadline errors! 

1210 # See https://github.com/HypothesisWorks/hypothesis/issues/2108 

1211 start_time = time.perf_counter() 

1212 gc_start_time = gc_cumulative_time() 

1213 

1214 strategy.validate() 

1215 

1216 if strategy.is_empty: 

1217 self.mark_invalid(f"empty strategy {self!r}") 

1218 

1219 if self.depth >= MAX_DEPTH: 

1220 self.mark_invalid("max depth exceeded") 

1221 

1222 # Jump directly to the unwrapped strategy for the label and for do_draw. 

1223 # This avoids adding an extra span to all lazy strategies. 

1224 unwrapped = unwrap_strategies(strategy) 

1225 if label is None: 

1226 label = unwrapped.label 

1227 assert isinstance(label, int) 

1228 

1229 self.start_span(label=label) 

1230 try: 

1231 if not at_top_level: 

1232 return unwrapped.do_draw(self) 

1233 assert start_time is not None 

1234 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}" 

1235 try: 

1236 try: 

1237 v = unwrapped.do_draw(self) 

1238 finally: 

1239 # Subtract the time spent in GC to avoid overcounting, as it is 

1240 # accounted for at the overall example level. 

1241 in_gctime = gc_cumulative_time() - gc_start_time 

1242 self.draw_times[key] = time.perf_counter() - start_time - in_gctime 

1243 except Exception as err: 

1244 add_note( 

1245 err, 

1246 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}", 

1247 ) 

1248 raise 

1249 if TESTCASE_CALLBACKS: 

1250 avoid = self.provider.avoid_realization 

1251 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid) 

1252 return v 

1253 finally: 

1254 self.stop_span() 

1255 

1256 def start_span(self, label: int) -> None: 

1257 self.provider.span_start(label) 

1258 self.__assert_not_frozen("start_span") 

1259 self.depth += 1 

1260 # Logically it would make sense for this to just be 

1261 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to 

1262 # be until we ran the code under tracemalloc and found a rather significant 

1263 # chunk of allocation was happening here. This was presumably due to varargs 

1264 # or the like, but we didn't investigate further given that it was easy 

1265 # to fix with this check. 

1266 if self.depth > self.max_depth: 

1267 self.max_depth = self.depth 

1268 self.__span_record.start_span(label) 

1269 self.labels_for_structure_stack.append({label}) 

1270 

1271 def stop_span(self, *, discard: bool = False) -> None: 

1272 self.provider.span_end(discard) 

1273 if self.frozen: 

1274 return 

1275 if discard: 

1276 self.has_discards = True 

1277 self.depth -= 1 

1278 assert self.depth >= -1 

1279 self.__span_record.stop_span(discard=discard) 

1280 

1281 labels_for_structure = self.labels_for_structure_stack.pop() 

1282 

1283 if not discard: 

1284 if self.labels_for_structure_stack: 

1285 self.labels_for_structure_stack[-1].update(labels_for_structure) 

1286 else: 

1287 self.tags.update([structural_coverage(l) for l in labels_for_structure]) 

1288 

1289 if discard: 

1290 # Once we've discarded a span, every test case starting with 

1291 # this prefix contains discards. We prune the tree at that point so 

1292 # as to avoid future test cases bothering with this region, on the 

1293 # assumption that some span that you could have used instead 

1294 # there would *not* trigger the discard. This greatly speeds up 

1295 # test case generation in some cases, because it allows us to 

1296 # ignore large swathes of the search space that are effectively 

1297 # redundant. 

1298 # 

1299 # A scenario that can cause us problems but which we deliberately 

1300 # have decided not to support is that if there are side effects 

1301 # during data generation then you may end up with a scenario where 

1302 # every good test case generates a discard because the discarded 

1303 # section sets up important things for later. This is not terribly 

1304 # likely and all that you see in this case is some degradation in 

1305 # quality of testing, so we don't worry about it. 

1306 # 

1307 # Note that killing the branch does *not* mean we will never 

1308 # explore below this point, and in particular we may do so during 

1309 # shrinking. Any explicit request for a data object that starts 

1310 # with the branch here will work just fine, but novel prefix 

1311 # generation will avoid it, and we can use it to detect when we 

1312 # have explored the entire tree (up to redundancy). 

1313 

1314 self.observer.kill_branch() 

1315 

1316 @property 

1317 def spans(self) -> Spans: 

1318 assert self.frozen 

1319 if self.__spans is None: 

1320 self.__spans = Spans(record=self.__span_record) 

1321 return self.__spans 

1322 

1323 def freeze(self) -> None: 

1324 if self.frozen: 

1325 return 

1326 self.finish_time = time.perf_counter() 

1327 self.gc_finish_time = gc_cumulative_time() 

1328 

1329 # Always finish by closing all remaining spans so that we have a valid tree. 

1330 while self.depth >= 0: 

1331 self.stop_span() 

1332 

1333 self.__span_record.freeze() 

1334 self.frozen = True 

1335 self.observer.conclude_test(self.status, self.interesting_origin) 

1336 

1337 def choice( 

1338 self, 

1339 values: Sequence[T], 

1340 *, 

1341 forced: Optional[T] = None, 

1342 observe: bool = True, 

1343 ) -> T: 

1344 forced_i = None if forced is None else values.index(forced) 

1345 i = self.draw_integer( 

1346 0, 

1347 len(values) - 1, 

1348 forced=forced_i, 

1349 observe=observe, 

1350 ) 

1351 return values[i] 

1352 

1353 def conclude_test( 

1354 self, 

1355 status: Status, 

1356 interesting_origin: Optional[InterestingOrigin] = None, 

1357 ) -> NoReturn: 

1358 assert (interesting_origin is None) or (status == Status.INTERESTING) 

1359 self.__assert_not_frozen("conclude_test") 

1360 self.interesting_origin = interesting_origin 

1361 self.status = status 

1362 self.freeze() 

1363 raise StopTest(self.testcounter) 

1364 

1365 def mark_interesting(self, interesting_origin: InterestingOrigin) -> NoReturn: 

1366 self.conclude_test(Status.INTERESTING, interesting_origin) 

1367 

1368 def mark_invalid(self, why: Optional[str] = None) -> NoReturn: 

1369 if why is not None: 

1370 self.events["invalid because"] = why 

1371 self.conclude_test(Status.INVALID) 

1372 

1373 def mark_overrun(self) -> NoReturn: 

1374 self.conclude_test(Status.OVERRUN) 

1375 

1376 

1377def draw_choice( 

1378 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT, *, random: Random 

1379) -> ChoiceT: 

1380 cd = ConjectureData(random=random) 

1381 return cast(ChoiceT, getattr(cd.provider, f"draw_{choice_type}")(**constraints))