Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/data.py: 67%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

629 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import math 

12import time 

13from collections import defaultdict 

14from collections.abc import Hashable, Iterable, Iterator, Sequence 

15from enum import IntEnum 

16from functools import cached_property 

17from random import Random 

18from typing import ( 

19 TYPE_CHECKING, 

20 Any, 

21 Literal, 

22 NoReturn, 

23 Optional, 

24 TypeVar, 

25 Union, 

26 cast, 

27 overload, 

28) 

29 

30import attr 

31 

32from hypothesis.errors import ( 

33 CannotProceedScopeT, 

34 ChoiceTooLarge, 

35 Frozen, 

36 InvalidArgument, 

37 StopTest, 

38) 

39from hypothesis.internal.cache import LRUCache 

40from hypothesis.internal.compat import add_note 

41from hypothesis.internal.conjecture.choice import ( 

42 BooleanConstraints, 

43 BytesConstraints, 

44 ChoiceConstraintsT, 

45 ChoiceNode, 

46 ChoiceT, 

47 ChoiceTemplate, 

48 ChoiceTypeT, 

49 FloatConstraints, 

50 IntegerConstraints, 

51 StringConstraints, 

52 choice_constraints_key, 

53 choice_from_index, 

54 choice_permitted, 

55 choices_size, 

56) 

57from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time 

58from hypothesis.internal.conjecture.providers import ( 

59 COLLECTION_DEFAULT_MAX_SIZE, 

60 HypothesisProvider, 

61 PrimitiveProvider, 

62) 

63from hypothesis.internal.conjecture.utils import calc_label_from_name 

64from hypothesis.internal.escalation import InterestingOrigin 

65from hypothesis.internal.floats import ( 

66 SMALLEST_SUBNORMAL, 

67 float_to_int, 

68 int_to_float, 

69 sign_aware_lte, 

70) 

71from hypothesis.internal.intervalsets import IntervalSet 

72from hypothesis.internal.observability import PredicateCounts 

73from hypothesis.reporting import debug_report 

74from hypothesis.utils.conventions import not_set 

75from hypothesis.utils.threading import ThreadLocal 

76 

77if TYPE_CHECKING: 

78 from typing import TypeAlias 

79 

80 from hypothesis.strategies import SearchStrategy 

81 from hypothesis.strategies._internal.core import DataObject 

82 from hypothesis.strategies._internal.random import RandomState 

83 from hypothesis.strategies._internal.strategies import Ex 

84 

85 

86def __getattr__(name: str) -> Any: 

87 if name == "AVAILABLE_PROVIDERS": 

88 from hypothesis._settings import note_deprecation 

89 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS 

90 

91 note_deprecation( 

92 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to " 

93 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.", 

94 since="2025-01-25", 

95 has_codemod=False, 

96 stacklevel=1, 

97 ) 

98 return AVAILABLE_PROVIDERS 

99 

100 raise AttributeError( 

101 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}" 

102 ) 

103 

104 

105T = TypeVar("T") 

106TargetObservations = dict[str, Union[int, float]] 

107# index, choice_type, constraints, forced value 

108MisalignedAt: "TypeAlias" = tuple[ 

109 int, ChoiceTypeT, ChoiceConstraintsT, Optional[ChoiceT] 

110] 

111 

112TOP_LABEL = calc_label_from_name("top") 

113MAX_DEPTH = 100 

114 

115threadlocal = ThreadLocal(global_test_counter=int) 

116 

117 

118class ExtraInformation: 

119 """A class for holding shared state on a ``ConjectureData`` that should 

120 be added to the final ``ConjectureResult``.""" 

121 

122 def __repr__(self) -> str: 

123 return "ExtraInformation({})".format( 

124 ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()), 

125 ) 

126 

127 def has_information(self) -> bool: 

128 return bool(self.__dict__) 

129 

130 

131class Status(IntEnum): 

132 OVERRUN = 0 

133 INVALID = 1 

134 VALID = 2 

135 INTERESTING = 3 

136 

137 def __repr__(self) -> str: 

138 return f"Status.{self.name}" 

139 

140 

141@attr.s(slots=True, frozen=True) 

142class StructuralCoverageTag: 

143 label: int = attr.ib() 

144 

145 

146STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {} 

147 

148 

149def structural_coverage(label: int) -> StructuralCoverageTag: 

150 try: 

151 return STRUCTURAL_COVERAGE_CACHE[label] 

152 except KeyError: 

153 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label)) 

154 

155 

156# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for 

157# performance. We lose scan resistance, but that's probably fine here. 

158POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096) 

159 

160 

161class Span: 

162 """A span tracks the hierarchical structure of choices within a single test run. 

163 

164 Spans are created to mark regions of the choice sequence that that are 

165 logically related to each other. For instance, Hypothesis tracks: 

166 - A single top-level span for the entire choice sequence 

167 - A span for the choices made by each strategy 

168 - Some strategies define additional spans within their choices. For instance, 

169 st.lists() tracks the "should add another element" choice and the "add 

170 another element" choices as separate spans. 

171 

172 Spans provide useful information to the shrinker, mutator, targeted PBT, 

173 and other subsystems of Hypothesis. 

174 

175 Rather than store each ``Span`` as a rich object, it is actually 

176 just an index into the ``Spans`` class defined below. This has two 

177 purposes: Firstly, for most properties of spans we will never need 

178 to allocate storage at all, because most properties are not used on 

179 most spans. Secondly, by storing the spans as compact lists 

180 of integers, we save a considerable amount of space compared to 

181 Python's normal object size. 

182 

183 This does have the downside that it increases the amount of allocation 

184 we do, and slows things down as a result, in some usage patterns because 

185 we repeatedly allocate the same Span or int objects, but it will 

186 often dramatically reduce our memory usage, so is worth it. 

187 """ 

188 

189 __slots__ = ("index", "owner") 

190 

191 def __init__(self, owner: "Spans", index: int) -> None: 

192 self.owner = owner 

193 self.index = index 

194 

195 def __eq__(self, other: object) -> bool: 

196 if self is other: 

197 return True 

198 if not isinstance(other, Span): 

199 return NotImplemented 

200 return (self.owner is other.owner) and (self.index == other.index) 

201 

202 def __ne__(self, other: object) -> bool: 

203 if self is other: 

204 return False 

205 if not isinstance(other, Span): 

206 return NotImplemented 

207 return (self.owner is not other.owner) or (self.index != other.index) 

208 

209 def __repr__(self) -> str: 

210 return f"spans[{self.index}]" 

211 

212 @property 

213 def label(self) -> int: 

214 """A label is an opaque value that associates each span with its 

215 approximate origin, such as a particular strategy class or a particular 

216 kind of draw.""" 

217 return self.owner.labels[self.owner.label_indices[self.index]] 

218 

219 @property 

220 def parent(self) -> Optional[int]: 

221 """The index of the span that this one is nested directly within.""" 

222 if self.index == 0: 

223 return None 

224 return self.owner.parentage[self.index] 

225 

226 @property 

227 def start(self) -> int: 

228 return self.owner.starts[self.index] 

229 

230 @property 

231 def end(self) -> int: 

232 return self.owner.ends[self.index] 

233 

234 @property 

235 def depth(self) -> int: 

236 """ 

237 Depth of this span in the span tree. The top-level span has a depth of 0. 

238 """ 

239 return self.owner.depths[self.index] 

240 

241 @property 

242 def discarded(self) -> bool: 

243 """True if this is span's ``stop_span`` call had ``discard`` set to 

244 ``True``. This means we believe that the shrinker should be able to delete 

245 this span completely, without affecting the value produced by its enclosing 

246 strategy. Typically set when a rejection sampler decides to reject a 

247 generated value and try again.""" 

248 return self.index in self.owner.discarded 

249 

250 @property 

251 def choice_count(self) -> int: 

252 """The number of choices in this span.""" 

253 return self.end - self.start 

254 

255 @property 

256 def children(self) -> "list[Span]": 

257 """The list of all spans with this as a parent, in increasing index 

258 order.""" 

259 return [self.owner[i] for i in self.owner.children[self.index]] 

260 

261 

262class SpanProperty: 

263 """There are many properties of spans that we calculate by 

264 essentially rerunning the test case multiple times based on the 

265 calls which we record in SpanProperty. 

266 

267 This class defines a visitor, subclasses of which can be used 

268 to calculate these properties. 

269 """ 

270 

271 def __init__(self, spans: "Spans"): 

272 self.span_stack: list[int] = [] 

273 self.spans = spans 

274 self.span_count = 0 

275 self.choice_count = 0 

276 

277 def run(self) -> Any: 

278 """Rerun the test case with this visitor and return the 

279 results of ``self.finish()``.""" 

280 for record in self.spans.trail: 

281 if record == TrailType.STOP_SPAN_DISCARD: 

282 self.__pop(discarded=True) 

283 elif record == TrailType.STOP_SPAN_NO_DISCARD: 

284 self.__pop(discarded=False) 

285 elif record == TrailType.CHOICE: 

286 self.choice_count += 1 

287 else: 

288 # everything after TrailType.CHOICE is the label of a span start. 

289 self.__push(record - TrailType.CHOICE - 1) 

290 

291 return self.finish() 

292 

293 def __push(self, label_index: int) -> None: 

294 i = self.span_count 

295 assert i < len(self.spans) 

296 self.start_span(i, label_index=label_index) 

297 self.span_count += 1 

298 self.span_stack.append(i) 

299 

300 def __pop(self, *, discarded: bool) -> None: 

301 i = self.span_stack.pop() 

302 self.stop_span(i, discarded=discarded) 

303 

304 def start_span(self, i: int, label_index: int) -> None: 

305 """Called at the start of each span, with ``i`` the 

306 index of the span and ``label_index`` the index of 

307 its label in ``self.spans.labels``.""" 

308 

309 def stop_span(self, i: int, *, discarded: bool) -> None: 

310 """Called at the end of each span, with ``i`` the 

311 index of the span and ``discarded`` being ``True`` if ``stop_span`` 

312 was called with ``discard=True``.""" 

313 

314 def finish(self) -> Any: 

315 raise NotImplementedError 

316 

317 

318class TrailType(IntEnum): 

319 STOP_SPAN_DISCARD = 1 

320 STOP_SPAN_NO_DISCARD = 2 

321 CHOICE = 3 

322 # every trail element larger than TrailType.CHOICE is the label of a span 

323 # start, offset by its index. So the first span label is stored as 4, the 

324 # second as 5, etc, regardless of its actual integer label. 

325 

326 

327class SpanRecord: 

328 """Records the series of ``start_span``, ``stop_span``, and 

329 ``draw_bits`` calls so that these may be stored in ``Spans`` and 

330 replayed when we need to know about the structure of individual 

331 ``Span`` objects. 

332 

333 Note that there is significant similarity between this class and 

334 ``DataObserver``, and the plan is to eventually unify them, but 

335 they currently have slightly different functions and implementations. 

336 """ 

337 

338 def __init__(self) -> None: 

339 self.labels: list[int] = [] 

340 self.__index_of_labels: Optional[dict[int, int]] = {} 

341 self.trail = IntList() 

342 self.nodes: list[ChoiceNode] = [] 

343 

344 def freeze(self) -> None: 

345 self.__index_of_labels = None 

346 

347 def record_choice(self) -> None: 

348 self.trail.append(TrailType.CHOICE) 

349 

350 def start_span(self, label: int) -> None: 

351 assert self.__index_of_labels is not None 

352 try: 

353 i = self.__index_of_labels[label] 

354 except KeyError: 

355 i = self.__index_of_labels.setdefault(label, len(self.labels)) 

356 self.labels.append(label) 

357 self.trail.append(TrailType.CHOICE + 1 + i) 

358 

359 def stop_span(self, *, discard: bool) -> None: 

360 if discard: 

361 self.trail.append(TrailType.STOP_SPAN_DISCARD) 

362 else: 

363 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD) 

364 

365 

366class _starts_and_ends(SpanProperty): 

367 def __init__(self, spans: "Spans") -> None: 

368 super().__init__(spans) 

369 self.starts = IntList.of_length(len(self.spans)) 

370 self.ends = IntList.of_length(len(self.spans)) 

371 

372 def start_span(self, i: int, label_index: int) -> None: 

373 self.starts[i] = self.choice_count 

374 

375 def stop_span(self, i: int, *, discarded: bool) -> None: 

376 self.ends[i] = self.choice_count 

377 

378 def finish(self) -> tuple[IntList, IntList]: 

379 return (self.starts, self.ends) 

380 

381 

382class _discarded(SpanProperty): 

383 def __init__(self, spans: "Spans") -> None: 

384 super().__init__(spans) 

385 self.result: set[int] = set() 

386 

387 def finish(self) -> frozenset[int]: 

388 return frozenset(self.result) 

389 

390 def stop_span(self, i: int, *, discarded: bool) -> None: 

391 if discarded: 

392 self.result.add(i) 

393 

394 

395class _parentage(SpanProperty): 

396 def __init__(self, spans: "Spans") -> None: 

397 super().__init__(spans) 

398 self.result = IntList.of_length(len(self.spans)) 

399 

400 def stop_span(self, i: int, *, discarded: bool) -> None: 

401 if i > 0: 

402 self.result[i] = self.span_stack[-1] 

403 

404 def finish(self) -> IntList: 

405 return self.result 

406 

407 

408class _depths(SpanProperty): 

409 def __init__(self, spans: "Spans") -> None: 

410 super().__init__(spans) 

411 self.result = IntList.of_length(len(self.spans)) 

412 

413 def start_span(self, i: int, label_index: int) -> None: 

414 self.result[i] = len(self.span_stack) 

415 

416 def finish(self) -> IntList: 

417 return self.result 

418 

419 

420class _label_indices(SpanProperty): 

421 def __init__(self, spans: "Spans") -> None: 

422 super().__init__(spans) 

423 self.result = IntList.of_length(len(self.spans)) 

424 

425 def start_span(self, i: int, label_index: int) -> None: 

426 self.result[i] = label_index 

427 

428 def finish(self) -> IntList: 

429 return self.result 

430 

431 

432class _mutator_groups(SpanProperty): 

433 def __init__(self, spans: "Spans") -> None: 

434 super().__init__(spans) 

435 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set) 

436 

437 def start_span(self, i: int, label_index: int) -> None: 

438 # TODO should we discard start == end cases? occurs for eg st.data() 

439 # which is conditionally or never drawn from. arguably swapping 

440 # nodes with the empty list is a useful mutation enabled by start == end? 

441 key = (self.spans[i].start, self.spans[i].end) 

442 self.groups[label_index].add(key) 

443 

444 def finish(self) -> Iterable[set[tuple[int, int]]]: 

445 # Discard groups with only one span, since the mutator can't 

446 # do anything useful with them. 

447 return [g for g in self.groups.values() if len(g) >= 2] 

448 

449 

450class Spans: 

451 """A lazy collection of ``Span`` objects, derived from 

452 the record of recorded behaviour in ``SpanRecord``. 

453 

454 Behaves logically as if it were a list of ``Span`` objects, 

455 but actually mostly exists as a compact store of information 

456 for them to reference into. All properties on here are best 

457 understood as the backing storage for ``Span`` and are 

458 described there. 

459 """ 

460 

461 def __init__(self, record: SpanRecord) -> None: 

462 self.trail = record.trail 

463 self.labels = record.labels 

464 self.__length = self.trail.count( 

465 TrailType.STOP_SPAN_DISCARD 

466 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD) 

467 self.__children: Optional[list[Sequence[int]]] = None 

468 

469 @cached_property 

470 def starts_and_ends(self) -> tuple[IntList, IntList]: 

471 return _starts_and_ends(self).run() 

472 

473 @property 

474 def starts(self) -> IntList: 

475 return self.starts_and_ends[0] 

476 

477 @property 

478 def ends(self) -> IntList: 

479 return self.starts_and_ends[1] 

480 

481 @cached_property 

482 def discarded(self) -> frozenset[int]: 

483 return _discarded(self).run() 

484 

485 @cached_property 

486 def parentage(self) -> IntList: 

487 return _parentage(self).run() 

488 

489 @cached_property 

490 def depths(self) -> IntList: 

491 return _depths(self).run() 

492 

493 @cached_property 

494 def label_indices(self) -> IntList: 

495 return _label_indices(self).run() 

496 

497 @cached_property 

498 def mutator_groups(self) -> list[set[tuple[int, int]]]: 

499 return _mutator_groups(self).run() 

500 

501 @property 

502 def children(self) -> list[Sequence[int]]: 

503 if self.__children is None: 

504 children = [IntList() for _ in range(len(self))] 

505 for i, p in enumerate(self.parentage): 

506 if i > 0: 

507 children[p].append(i) 

508 # Replace empty children lists with a tuple to reduce 

509 # memory usage. 

510 for i, c in enumerate(children): 

511 if not c: 

512 children[i] = () # type: ignore 

513 self.__children = children # type: ignore 

514 return self.__children # type: ignore 

515 

516 def __len__(self) -> int: 

517 return self.__length 

518 

519 def __getitem__(self, i: int) -> Span: 

520 n = self.__length 

521 if i < -n or i >= n: 

522 raise IndexError(f"Index {i} out of range [-{n}, {n})") 

523 if i < 0: 

524 i += n 

525 return Span(self, i) 

526 

527 # not strictly necessary as we have len/getitem, but required for mypy. 

528 # https://github.com/python/mypy/issues/9737 

529 def __iter__(self) -> Iterator[Span]: 

530 for i in range(len(self)): 

531 yield self[i] 

532 

533 

534class _Overrun: 

535 status: Status = Status.OVERRUN 

536 

537 def __repr__(self) -> str: 

538 return "Overrun" 

539 

540 

541Overrun = _Overrun() 

542 

543 

544class DataObserver: 

545 """Observer class for recording the behaviour of a 

546 ConjectureData object, primarily used for tracking 

547 the behaviour in the tree cache.""" 

548 

549 def conclude_test( 

550 self, 

551 status: Status, 

552 interesting_origin: Optional[InterestingOrigin], 

553 ) -> None: 

554 """Called when ``conclude_test`` is called on the 

555 observed ``ConjectureData``, with the same arguments. 

556 

557 Note that this is called after ``freeze`` has completed. 

558 """ 

559 

560 def kill_branch(self) -> None: 

561 """Mark this part of the tree as not worth re-exploring.""" 

562 

563 def draw_integer( 

564 self, value: int, *, constraints: IntegerConstraints, was_forced: bool 

565 ) -> None: 

566 pass 

567 

568 def draw_float( 

569 self, value: float, *, constraints: FloatConstraints, was_forced: bool 

570 ) -> None: 

571 pass 

572 

573 def draw_string( 

574 self, value: str, *, constraints: StringConstraints, was_forced: bool 

575 ) -> None: 

576 pass 

577 

578 def draw_bytes( 

579 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool 

580 ) -> None: 

581 pass 

582 

583 def draw_boolean( 

584 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool 

585 ) -> None: 

586 pass 

587 

588 

589@attr.s(slots=True) 

590class ConjectureResult: 

591 """Result class storing the parts of ConjectureData that we 

592 will care about after the original ConjectureData has outlived its 

593 usefulness.""" 

594 

595 status: Status = attr.ib() 

596 interesting_origin: Optional[InterestingOrigin] = attr.ib() 

597 nodes: tuple[ChoiceNode, ...] = attr.ib(eq=False, repr=False) 

598 length: int = attr.ib() 

599 output: str = attr.ib() 

600 extra_information: Optional[ExtraInformation] = attr.ib() 

601 expected_exception: Optional[BaseException] = attr.ib() 

602 expected_traceback: Optional[str] = attr.ib() 

603 has_discards: bool = attr.ib() 

604 target_observations: TargetObservations = attr.ib() 

605 tags: frozenset[StructuralCoverageTag] = attr.ib() 

606 spans: Spans = attr.ib(repr=False, eq=False) 

607 arg_slices: set[tuple[int, int]] = attr.ib(repr=False) 

608 slice_comments: dict[tuple[int, int], str] = attr.ib(repr=False) 

609 misaligned_at: Optional[MisalignedAt] = attr.ib(repr=False) 

610 cannot_proceed_scope: Optional[CannotProceedScopeT] = attr.ib(repr=False) 

611 

612 def as_result(self) -> "ConjectureResult": 

613 return self 

614 

615 @property 

616 def choices(self) -> tuple[ChoiceT, ...]: 

617 return tuple(node.value for node in self.nodes) 

618 

619 

620class ConjectureData: 

621 @classmethod 

622 def for_choices( 

623 cls, 

624 choices: Sequence[Union[ChoiceTemplate, ChoiceT]], 

625 *, 

626 observer: Optional[DataObserver] = None, 

627 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

628 random: Optional[Random] = None, 

629 ) -> "ConjectureData": 

630 from hypothesis.internal.conjecture.engine import choice_count 

631 

632 return cls( 

633 max_choices=choice_count(choices), 

634 random=random, 

635 prefix=choices, 

636 observer=observer, 

637 provider=provider, 

638 ) 

639 

640 def __init__( 

641 self, 

642 *, 

643 random: Optional[Random], 

644 observer: Optional[DataObserver] = None, 

645 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

646 prefix: Optional[Sequence[Union[ChoiceTemplate, ChoiceT]]] = None, 

647 max_choices: Optional[int] = None, 

648 provider_kw: Optional[dict[str, Any]] = None, 

649 ) -> None: 

650 from hypothesis.internal.conjecture.engine import BUFFER_SIZE 

651 

652 if observer is None: 

653 observer = DataObserver() 

654 if provider_kw is None: 

655 provider_kw = {} 

656 elif not isinstance(provider, type): 

657 raise InvalidArgument( 

658 f"Expected {provider=} to be a class since {provider_kw=} was " 

659 "passed, but got an instance instead." 

660 ) 

661 

662 assert isinstance(observer, DataObserver) 

663 self.observer = observer 

664 self.max_choices = max_choices 

665 self.max_length = BUFFER_SIZE 

666 self.is_find = False 

667 self.overdraw = 0 

668 self._random = random 

669 

670 self.length = 0 

671 self.index = 0 

672 self.output = "" 

673 self.status = Status.VALID 

674 self.frozen = False 

675 self.testcounter = threadlocal.global_test_counter 

676 threadlocal.global_test_counter += 1 

677 self.start_time = time.perf_counter() 

678 self.gc_start_time = gc_cumulative_time() 

679 self.events: dict[str, Union[str, int, float]] = {} 

680 self.interesting_origin: Optional[InterestingOrigin] = None 

681 self.draw_times: dict[str, float] = {} 

682 self._stateful_run_times: dict[str, float] = defaultdict(float) 

683 self.max_depth = 0 

684 self.has_discards = False 

685 

686 self.provider: PrimitiveProvider = ( 

687 provider(self, **provider_kw) if isinstance(provider, type) else provider 

688 ) 

689 assert isinstance(self.provider, PrimitiveProvider) 

690 

691 self.__result: Optional[ConjectureResult] = None 

692 

693 # Observations used for targeted search. They'll be aggregated in 

694 # ConjectureRunner.generate_new_examples and fed to TargetSelector. 

695 self.target_observations: TargetObservations = {} 

696 

697 # Tags which indicate something about which part of the search space 

698 # this example is in. These are used to guide generation. 

699 self.tags: set[StructuralCoverageTag] = set() 

700 self.labels_for_structure_stack: list[set[int]] = [] 

701 

702 # Normally unpopulated but we need this in the niche case 

703 # that self.as_result() is Overrun but we still want the 

704 # examples for reporting purposes. 

705 self.__spans: Optional[Spans] = None 

706 

707 # We want the top level span to have depth 0, so we start 

708 # at -1. 

709 self.depth = -1 

710 self.__span_record = SpanRecord() 

711 

712 # Slice indices for discrete reportable parts that which-parts-matter can 

713 # try varying, to report if the minimal example always fails anyway. 

714 self.arg_slices: set[tuple[int, int]] = set() 

715 self.slice_comments: dict[tuple[int, int], str] = {} 

716 self._observability_args: dict[str, Any] = {} 

717 self._observability_predicates: defaultdict[str, PredicateCounts] = defaultdict( 

718 PredicateCounts 

719 ) 

720 

721 self._sampled_from_all_strategies_elements_message: Optional[ 

722 tuple[str, object] 

723 ] = None 

724 self._shared_strategy_draws: dict[Hashable, tuple[int, Any]] = {} 

725 self._shared_data_strategy: Optional[DataObject] = None 

726 self._stateful_repr_parts: Optional[list[Any]] = None 

727 self.states_for_ids: Optional[dict[int, RandomState]] = None 

728 self.seeds_to_states: Optional[dict[Any, RandomState]] = None 

729 self.hypothesis_runner = not_set 

730 

731 self.expected_exception: Optional[BaseException] = None 

732 self.expected_traceback: Optional[str] = None 

733 self.extra_information = ExtraInformation() 

734 

735 self.prefix = prefix 

736 self.nodes: tuple[ChoiceNode, ...] = () 

737 self.misaligned_at: Optional[MisalignedAt] = None 

738 self.cannot_proceed_scope: Optional[CannotProceedScopeT] = None 

739 self.start_span(TOP_LABEL) 

740 

741 def __repr__(self) -> str: 

742 return "ConjectureData(%s, %d choices%s)" % ( 

743 self.status.name, 

744 len(self.nodes), 

745 ", frozen" if self.frozen else "", 

746 ) 

747 

748 @property 

749 def choices(self) -> tuple[ChoiceT, ...]: 

750 return tuple(node.value for node in self.nodes) 

751 

752 # draw_* functions might be called in one of two contexts: either "above" or 

753 # "below" the choice sequence. For instance, draw_string calls draw_boolean 

754 # from ``many`` when calculating the number of characters to return. We do 

755 # not want these choices to get written to the choice sequence, because they 

756 # are not true choices themselves. 

757 # 

758 # `observe` formalizes this. The choice will only be written to the choice 

759 # sequence if observe is True. 

760 

761 @overload 

762 def _draw( 

763 self, 

764 choice_type: Literal["integer"], 

765 constraints: IntegerConstraints, 

766 *, 

767 observe: bool, 

768 forced: Optional[int], 

769 ) -> int: ... 

770 

771 @overload 

772 def _draw( 

773 self, 

774 choice_type: Literal["float"], 

775 constraints: FloatConstraints, 

776 *, 

777 observe: bool, 

778 forced: Optional[float], 

779 ) -> float: ... 

780 

781 @overload 

782 def _draw( 

783 self, 

784 choice_type: Literal["string"], 

785 constraints: StringConstraints, 

786 *, 

787 observe: bool, 

788 forced: Optional[str], 

789 ) -> str: ... 

790 

791 @overload 

792 def _draw( 

793 self, 

794 choice_type: Literal["bytes"], 

795 constraints: BytesConstraints, 

796 *, 

797 observe: bool, 

798 forced: Optional[bytes], 

799 ) -> bytes: ... 

800 

801 @overload 

802 def _draw( 

803 self, 

804 choice_type: Literal["boolean"], 

805 constraints: BooleanConstraints, 

806 *, 

807 observe: bool, 

808 forced: Optional[bool], 

809 ) -> bool: ... 

810 

811 def _draw( 

812 self, 

813 choice_type: ChoiceTypeT, 

814 constraints: ChoiceConstraintsT, 

815 *, 

816 observe: bool, 

817 forced: Optional[ChoiceT], 

818 ) -> ChoiceT: 

819 # this is somewhat redundant with the length > max_length check at the 

820 # end of the function, but avoids trying to use a null self.random when 

821 # drawing past the node of a ConjectureData.for_choices data. 

822 if self.length == self.max_length: 

823 debug_report(f"overrun because hit {self.max_length=}") 

824 self.mark_overrun() 

825 if len(self.nodes) == self.max_choices: 

826 debug_report(f"overrun because hit {self.max_choices=}") 

827 self.mark_overrun() 

828 

829 if observe and self.prefix is not None and self.index < len(self.prefix): 

830 value = self._pop_choice(choice_type, constraints, forced=forced) 

831 elif forced is None: 

832 value = getattr(self.provider, f"draw_{choice_type}")(**constraints) 

833 

834 if forced is not None: 

835 value = forced 

836 

837 # nan values generated via int_to_float break list membership: 

838 # 

839 # >>> n = 18444492273895866368 

840 # >>> assert math.isnan(int_to_float(n)) 

841 # >>> assert int_to_float(n) not in [int_to_float(n)] 

842 # 

843 # because int_to_float nans are not equal in the sense of either 

844 # `a == b` or `a is b`. 

845 # 

846 # This can lead to flaky errors when collections require unique 

847 # floats. What was happening is that in some places we provided math.nan 

848 # provide math.nan, and in others we provided 

849 # int_to_float(float_to_int(math.nan)), and which one gets used 

850 # was not deterministic across test iterations. 

851 # 

852 # To fix this, *never* provide a nan value which is equal (via `is`) to 

853 # another provided nan value. This sacrifices some test power; we should 

854 # bring that back (ABOVE the choice sequence layer) in the future. 

855 # 

856 # See https://github.com/HypothesisWorks/hypothesis/issues/3926. 

857 if choice_type == "float": 

858 assert isinstance(value, float) 

859 if math.isnan(value): 

860 value = int_to_float(float_to_int(value)) 

861 

862 if observe: 

863 was_forced = forced is not None 

864 getattr(self.observer, f"draw_{choice_type}")( 

865 value, constraints=constraints, was_forced=was_forced 

866 ) 

867 size = 0 if self.provider.avoid_realization else choices_size([value]) 

868 if self.length + size > self.max_length: 

869 debug_report( 

870 f"overrun because {self.length=} + {size=} > {self.max_length=}" 

871 ) 

872 self.mark_overrun() 

873 

874 node = ChoiceNode( 

875 type=choice_type, 

876 value=value, 

877 constraints=constraints, 

878 was_forced=was_forced, 

879 index=len(self.nodes), 

880 ) 

881 self.__span_record.record_choice() 

882 self.nodes += (node,) 

883 self.length += size 

884 

885 return value 

886 

887 def draw_integer( 

888 self, 

889 min_value: Optional[int] = None, 

890 max_value: Optional[int] = None, 

891 *, 

892 weights: Optional[dict[int, float]] = None, 

893 shrink_towards: int = 0, 

894 forced: Optional[int] = None, 

895 observe: bool = True, 

896 ) -> int: 

897 # Validate arguments 

898 if weights is not None: 

899 assert min_value is not None 

900 assert max_value is not None 

901 assert len(weights) <= 255 # arbitrary practical limit 

902 # We can and should eventually support total weights. But this 

903 # complicates shrinking as we can no longer assume we can force 

904 # a value to the unmapped probability mass if that mass might be 0. 

905 assert sum(weights.values()) < 1 

906 # similarly, things get simpler if we assume every value is possible. 

907 # we'll want to drop this restriction eventually. 

908 assert all(w != 0 for w in weights.values()) 

909 

910 if forced is not None and min_value is not None: 

911 assert min_value <= forced 

912 if forced is not None and max_value is not None: 

913 assert forced <= max_value 

914 

915 constraints: IntegerConstraints = self._pooled_constraints( 

916 "integer", 

917 { 

918 "min_value": min_value, 

919 "max_value": max_value, 

920 "weights": weights, 

921 "shrink_towards": shrink_towards, 

922 }, 

923 ) 

924 return self._draw("integer", constraints, observe=observe, forced=forced) 

925 

926 def draw_float( 

927 self, 

928 min_value: float = -math.inf, 

929 max_value: float = math.inf, 

930 *, 

931 allow_nan: bool = True, 

932 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL, 

933 # TODO: consider supporting these float widths at the choice sequence 

934 # level in the future. 

935 # width: Literal[16, 32, 64] = 64, 

936 forced: Optional[float] = None, 

937 observe: bool = True, 

938 ) -> float: 

939 assert smallest_nonzero_magnitude > 0 

940 assert not math.isnan(min_value) 

941 assert not math.isnan(max_value) 

942 

943 if smallest_nonzero_magnitude == 0.0: # pragma: no cover 

944 raise FloatingPointError( 

945 "Got allow_subnormal=True, but we can't represent subnormal floats " 

946 "right now, in violation of the IEEE-754 floating-point " 

947 "specification. This is usually because something was compiled with " 

948 "-ffast-math or a similar option, which sets global processor state. " 

949 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed " 

950 "writeup - and good luck!" 

951 ) 

952 

953 if forced is not None: 

954 assert allow_nan or not math.isnan(forced) 

955 assert math.isnan(forced) or ( 

956 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value) 

957 ) 

958 

959 constraints: FloatConstraints = self._pooled_constraints( 

960 "float", 

961 { 

962 "min_value": min_value, 

963 "max_value": max_value, 

964 "allow_nan": allow_nan, 

965 "smallest_nonzero_magnitude": smallest_nonzero_magnitude, 

966 }, 

967 ) 

968 return self._draw("float", constraints, observe=observe, forced=forced) 

969 

970 def draw_string( 

971 self, 

972 intervals: IntervalSet, 

973 *, 

974 min_size: int = 0, 

975 max_size: int = COLLECTION_DEFAULT_MAX_SIZE, 

976 forced: Optional[str] = None, 

977 observe: bool = True, 

978 ) -> str: 

979 assert forced is None or min_size <= len(forced) <= max_size 

980 assert min_size >= 0 

981 if len(intervals) == 0: 

982 assert min_size == 0 

983 

984 constraints: StringConstraints = self._pooled_constraints( 

985 "string", 

986 { 

987 "intervals": intervals, 

988 "min_size": min_size, 

989 "max_size": max_size, 

990 }, 

991 ) 

992 return self._draw("string", constraints, observe=observe, forced=forced) 

993 

994 def draw_bytes( 

995 self, 

996 min_size: int = 0, 

997 max_size: int = COLLECTION_DEFAULT_MAX_SIZE, 

998 *, 

999 forced: Optional[bytes] = None, 

1000 observe: bool = True, 

1001 ) -> bytes: 

1002 assert forced is None or min_size <= len(forced) <= max_size 

1003 assert min_size >= 0 

1004 

1005 constraints: BytesConstraints = self._pooled_constraints( 

1006 "bytes", {"min_size": min_size, "max_size": max_size} 

1007 ) 

1008 return self._draw("bytes", constraints, observe=observe, forced=forced) 

1009 

1010 def draw_boolean( 

1011 self, 

1012 p: float = 0.5, 

1013 *, 

1014 forced: Optional[bool] = None, 

1015 observe: bool = True, 

1016 ) -> bool: 

1017 assert (forced is not True) or p > 0 

1018 assert (forced is not False) or p < 1 

1019 

1020 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p}) 

1021 return self._draw("boolean", constraints, observe=observe, forced=forced) 

1022 

1023 @overload 

1024 def _pooled_constraints( 

1025 self, choice_type: Literal["integer"], constraints: IntegerConstraints 

1026 ) -> IntegerConstraints: ... 

1027 

1028 @overload 

1029 def _pooled_constraints( 

1030 self, choice_type: Literal["float"], constraints: FloatConstraints 

1031 ) -> FloatConstraints: ... 

1032 

1033 @overload 

1034 def _pooled_constraints( 

1035 self, choice_type: Literal["string"], constraints: StringConstraints 

1036 ) -> StringConstraints: ... 

1037 

1038 @overload 

1039 def _pooled_constraints( 

1040 self, choice_type: Literal["bytes"], constraints: BytesConstraints 

1041 ) -> BytesConstraints: ... 

1042 

1043 @overload 

1044 def _pooled_constraints( 

1045 self, choice_type: Literal["boolean"], constraints: BooleanConstraints 

1046 ) -> BooleanConstraints: ... 

1047 

1048 def _pooled_constraints( 

1049 self, choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT 

1050 ) -> ChoiceConstraintsT: 

1051 """Memoize common dictionary objects to reduce memory pressure.""" 

1052 # caching runs afoul of nondeterminism checks 

1053 if self.provider.avoid_realization: 

1054 return constraints 

1055 

1056 key = (choice_type, *choice_constraints_key(choice_type, constraints)) 

1057 try: 

1058 return POOLED_CONSTRAINTS_CACHE[key] 

1059 except KeyError: 

1060 POOLED_CONSTRAINTS_CACHE[key] = constraints 

1061 return constraints 

1062 

1063 def _pop_choice( 

1064 self, 

1065 choice_type: ChoiceTypeT, 

1066 constraints: ChoiceConstraintsT, 

1067 *, 

1068 forced: Optional[ChoiceT], 

1069 ) -> ChoiceT: 

1070 assert self.prefix is not None 

1071 # checked in _draw 

1072 assert self.index < len(self.prefix) 

1073 

1074 value = self.prefix[self.index] 

1075 if isinstance(value, ChoiceTemplate): 

1076 node: ChoiceTemplate = value 

1077 if node.count is not None: 

1078 assert node.count >= 0 

1079 # node templates have to be at the end for now, since it's not immediately 

1080 # apparent how to handle overruning a node template while generating a single 

1081 # node if the alternative is not "the entire data is an overrun". 

1082 assert self.index == len(self.prefix) - 1 

1083 if node.type == "simplest": 

1084 if forced is not None: 

1085 choice = forced 

1086 try: 

1087 choice = choice_from_index(0, choice_type, constraints) 

1088 except ChoiceTooLarge: 

1089 self.mark_overrun() 

1090 else: 

1091 raise NotImplementedError 

1092 

1093 if node.count is not None: 

1094 node.count -= 1 

1095 if node.count < 0: 

1096 self.mark_overrun() 

1097 return choice 

1098 

1099 choice = value 

1100 node_choice_type = { 

1101 str: "string", 

1102 float: "float", 

1103 int: "integer", 

1104 bool: "boolean", 

1105 bytes: "bytes", 

1106 }[type(choice)] 

1107 # If we're trying to: 

1108 # * draw a different choice type at the same location 

1109 # * draw the same choice type with a different constraints, which does not permit 

1110 # the current value 

1111 # 

1112 # then we call this a misalignment, because the choice sequence has 

1113 # changed from what we expected at some point. An easy misalignment is 

1114 # 

1115 # one_of(integers(0, 100), integers(101, 200)) 

1116 # 

1117 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100} 

1118 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at 

1119 # index 1 (which does not permit any of the values 0-100). 

1120 # 

1121 # When the choice sequence becomes misaligned, we generate a new value of the 

1122 # type and constraints the strategy expects. 

1123 if node_choice_type != choice_type or not choice_permitted(choice, constraints): 

1124 # only track first misalignment for now. 

1125 if self.misaligned_at is None: 

1126 self.misaligned_at = (self.index, choice_type, constraints, forced) 

1127 try: 

1128 # Fill in any misalignments with index 0 choices. An alternative to 

1129 # this is using the index of the misaligned choice instead 

1130 # of index 0, which may be useful for maintaining 

1131 # "similarly-complex choices" in the shrinker. This requires 

1132 # attaching an index to every choice in ConjectureData.for_choices, 

1133 # which we don't always have (e.g. when reading from db). 

1134 # 

1135 # If we really wanted this in the future we could make this complexity 

1136 # optional, use it if present, and default to index 0 otherwise. 

1137 # This complicates our internal api and so I'd like to avoid it 

1138 # if possible. 

1139 # 

1140 # Additionally, I don't think slips which require 

1141 # slipping to high-complexity values are common. Though arguably 

1142 # we may want to expand a bit beyond *just* the simplest choice. 

1143 # (we could for example consider sampling choices from index 0-10). 

1144 choice = choice_from_index(0, choice_type, constraints) 

1145 except ChoiceTooLarge: 

1146 # should really never happen with a 0-index choice, but let's be safe. 

1147 self.mark_overrun() 

1148 

1149 self.index += 1 

1150 return choice 

1151 

1152 def as_result(self) -> Union[ConjectureResult, _Overrun]: 

1153 """Convert the result of running this test into 

1154 either an Overrun object or a ConjectureResult.""" 

1155 

1156 assert self.frozen 

1157 if self.status == Status.OVERRUN: 

1158 return Overrun 

1159 if self.__result is None: 

1160 self.__result = ConjectureResult( 

1161 status=self.status, 

1162 interesting_origin=self.interesting_origin, 

1163 spans=self.spans, 

1164 nodes=self.nodes, 

1165 length=self.length, 

1166 output=self.output, 

1167 expected_traceback=self.expected_traceback, 

1168 expected_exception=self.expected_exception, 

1169 extra_information=( 

1170 self.extra_information 

1171 if self.extra_information.has_information() 

1172 else None 

1173 ), 

1174 has_discards=self.has_discards, 

1175 target_observations=self.target_observations, 

1176 tags=frozenset(self.tags), 

1177 arg_slices=self.arg_slices, 

1178 slice_comments=self.slice_comments, 

1179 misaligned_at=self.misaligned_at, 

1180 cannot_proceed_scope=self.cannot_proceed_scope, 

1181 ) 

1182 assert self.__result is not None 

1183 return self.__result 

1184 

1185 def __assert_not_frozen(self, name: str) -> None: 

1186 if self.frozen: 

1187 raise Frozen(f"Cannot call {name} on frozen ConjectureData") 

1188 

1189 def note(self, value: Any) -> None: 

1190 self.__assert_not_frozen("note") 

1191 if not isinstance(value, str): 

1192 value = repr(value) 

1193 self.output += value 

1194 

1195 def draw( 

1196 self, 

1197 strategy: "SearchStrategy[Ex]", 

1198 label: Optional[int] = None, 

1199 observe_as: Optional[str] = None, 

1200 ) -> "Ex": 

1201 from hypothesis.internal.observability import observability_enabled 

1202 from hypothesis.strategies._internal.lazy import unwrap_strategies 

1203 from hypothesis.strategies._internal.utils import to_jsonable 

1204 

1205 if self.is_find and not strategy.supports_find: 

1206 raise InvalidArgument( 

1207 f"Cannot use strategy {strategy!r} within a call to find " 

1208 "(presumably because it would be invalid after the call had ended)." 

1209 ) 

1210 

1211 at_top_level = self.depth == 0 

1212 start_time = None 

1213 if at_top_level: 

1214 # We start this timer early, because accessing attributes on a LazyStrategy 

1215 # can be almost arbitrarily slow. In cases like characters() and text() 

1216 # where we cache something expensive, this led to Flaky deadline errors! 

1217 # See https://github.com/HypothesisWorks/hypothesis/issues/2108 

1218 start_time = time.perf_counter() 

1219 gc_start_time = gc_cumulative_time() 

1220 

1221 strategy.validate() 

1222 

1223 if strategy.is_empty: 

1224 self.mark_invalid(f"empty strategy {self!r}") 

1225 

1226 if self.depth >= MAX_DEPTH: 

1227 self.mark_invalid("max depth exceeded") 

1228 

1229 # Jump directly to the unwrapped strategy for the label and for do_draw. 

1230 # This avoids adding an extra span to all lazy strategies. 

1231 unwrapped = unwrap_strategies(strategy) 

1232 if label is None: 

1233 label = unwrapped.label 

1234 assert isinstance(label, int) 

1235 

1236 self.start_span(label=label) 

1237 try: 

1238 if not at_top_level: 

1239 return unwrapped.do_draw(self) 

1240 assert start_time is not None 

1241 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}" 

1242 try: 

1243 try: 

1244 v = unwrapped.do_draw(self) 

1245 finally: 

1246 # Subtract the time spent in GC to avoid overcounting, as it is 

1247 # accounted for at the overall example level. 

1248 in_gctime = gc_cumulative_time() - gc_start_time 

1249 self.draw_times[key] = time.perf_counter() - start_time - in_gctime 

1250 except Exception as err: 

1251 add_note( 

1252 err, 

1253 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}", 

1254 ) 

1255 raise 

1256 if observability_enabled(): 

1257 avoid = self.provider.avoid_realization 

1258 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid) 

1259 return v 

1260 finally: 

1261 self.stop_span() 

1262 

1263 def start_span(self, label: int) -> None: 

1264 self.provider.span_start(label) 

1265 self.__assert_not_frozen("start_span") 

1266 self.depth += 1 

1267 # Logically it would make sense for this to just be 

1268 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to 

1269 # be until we ran the code under tracemalloc and found a rather significant 

1270 # chunk of allocation was happening here. This was presumably due to varargs 

1271 # or the like, but we didn't investigate further given that it was easy 

1272 # to fix with this check. 

1273 if self.depth > self.max_depth: 

1274 self.max_depth = self.depth 

1275 self.__span_record.start_span(label) 

1276 self.labels_for_structure_stack.append({label}) 

1277 

1278 def stop_span(self, *, discard: bool = False) -> None: 

1279 self.provider.span_end(discard) 

1280 if self.frozen: 

1281 return 

1282 if discard: 

1283 self.has_discards = True 

1284 self.depth -= 1 

1285 assert self.depth >= -1 

1286 self.__span_record.stop_span(discard=discard) 

1287 

1288 labels_for_structure = self.labels_for_structure_stack.pop() 

1289 

1290 if not discard: 

1291 if self.labels_for_structure_stack: 

1292 self.labels_for_structure_stack[-1].update(labels_for_structure) 

1293 else: 

1294 self.tags.update([structural_coverage(l) for l in labels_for_structure]) 

1295 

1296 if discard: 

1297 # Once we've discarded a span, every test case starting with 

1298 # this prefix contains discards. We prune the tree at that point so 

1299 # as to avoid future test cases bothering with this region, on the 

1300 # assumption that some span that you could have used instead 

1301 # there would *not* trigger the discard. This greatly speeds up 

1302 # test case generation in some cases, because it allows us to 

1303 # ignore large swathes of the search space that are effectively 

1304 # redundant. 

1305 # 

1306 # A scenario that can cause us problems but which we deliberately 

1307 # have decided not to support is that if there are side effects 

1308 # during data generation then you may end up with a scenario where 

1309 # every good test case generates a discard because the discarded 

1310 # section sets up important things for later. This is not terribly 

1311 # likely and all that you see in this case is some degradation in 

1312 # quality of testing, so we don't worry about it. 

1313 # 

1314 # Note that killing the branch does *not* mean we will never 

1315 # explore below this point, and in particular we may do so during 

1316 # shrinking. Any explicit request for a data object that starts 

1317 # with the branch here will work just fine, but novel prefix 

1318 # generation will avoid it, and we can use it to detect when we 

1319 # have explored the entire tree (up to redundancy). 

1320 

1321 self.observer.kill_branch() 

1322 

1323 @property 

1324 def spans(self) -> Spans: 

1325 assert self.frozen 

1326 if self.__spans is None: 

1327 self.__spans = Spans(record=self.__span_record) 

1328 return self.__spans 

1329 

1330 def freeze(self) -> None: 

1331 if self.frozen: 

1332 return 

1333 self.finish_time = time.perf_counter() 

1334 self.gc_finish_time = gc_cumulative_time() 

1335 

1336 # Always finish by closing all remaining spans so that we have a valid tree. 

1337 while self.depth >= 0: 

1338 self.stop_span() 

1339 

1340 self.__span_record.freeze() 

1341 self.frozen = True 

1342 self.observer.conclude_test(self.status, self.interesting_origin) 

1343 

1344 def choice( 

1345 self, 

1346 values: Sequence[T], 

1347 *, 

1348 forced: Optional[T] = None, 

1349 observe: bool = True, 

1350 ) -> T: 

1351 forced_i = None if forced is None else values.index(forced) 

1352 i = self.draw_integer( 

1353 0, 

1354 len(values) - 1, 

1355 forced=forced_i, 

1356 observe=observe, 

1357 ) 

1358 return values[i] 

1359 

1360 def conclude_test( 

1361 self, 

1362 status: Status, 

1363 interesting_origin: Optional[InterestingOrigin] = None, 

1364 ) -> NoReturn: 

1365 assert (interesting_origin is None) or (status == Status.INTERESTING) 

1366 self.__assert_not_frozen("conclude_test") 

1367 self.interesting_origin = interesting_origin 

1368 self.status = status 

1369 self.freeze() 

1370 raise StopTest(self.testcounter) 

1371 

1372 def mark_interesting(self, interesting_origin: InterestingOrigin) -> NoReturn: 

1373 self.conclude_test(Status.INTERESTING, interesting_origin) 

1374 

1375 def mark_invalid(self, why: Optional[str] = None) -> NoReturn: 

1376 if why is not None: 

1377 self.events["invalid because"] = why 

1378 self.conclude_test(Status.INVALID) 

1379 

1380 def mark_overrun(self) -> NoReturn: 

1381 self.conclude_test(Status.OVERRUN) 

1382 

1383 

1384def draw_choice( 

1385 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT, *, random: Random 

1386) -> ChoiceT: 

1387 cd = ConjectureData(random=random) 

1388 return cast(ChoiceT, getattr(cd.provider, f"draw_{choice_type}")(**constraints))