Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/data.py: 67%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

622 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import math 

12import time 

13from collections import defaultdict 

14from collections.abc import Hashable, Iterable, Iterator, Sequence 

15from enum import IntEnum 

16from functools import cached_property 

17from random import Random 

18from typing import ( 

19 TYPE_CHECKING, 

20 Any, 

21 Literal, 

22 NoReturn, 

23 Optional, 

24 TypeVar, 

25 Union, 

26 cast, 

27 overload, 

28) 

29 

30import attr 

31 

32from hypothesis.errors import ( 

33 CannotProceedScopeT, 

34 ChoiceTooLarge, 

35 Frozen, 

36 InvalidArgument, 

37 StopTest, 

38) 

39from hypothesis.internal.cache import LRUCache 

40from hypothesis.internal.compat import add_note 

41from hypothesis.internal.conjecture.choice import ( 

42 BooleanConstraints, 

43 BytesConstraints, 

44 ChoiceConstraintsT, 

45 ChoiceNode, 

46 ChoiceT, 

47 ChoiceTemplate, 

48 ChoiceTypeT, 

49 FloatConstraints, 

50 IntegerConstraints, 

51 StringConstraints, 

52 choice_constraints_key, 

53 choice_from_index, 

54 choice_permitted, 

55 choices_size, 

56) 

57from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time 

58from hypothesis.internal.conjecture.providers import ( 

59 COLLECTION_DEFAULT_MAX_SIZE, 

60 HypothesisProvider, 

61 PrimitiveProvider, 

62) 

63from hypothesis.internal.conjecture.utils import calc_label_from_name 

64from hypothesis.internal.escalation import InterestingOrigin 

65from hypothesis.internal.floats import ( 

66 SMALLEST_SUBNORMAL, 

67 float_to_int, 

68 int_to_float, 

69 sign_aware_lte, 

70) 

71from hypothesis.internal.intervalsets import IntervalSet 

72from hypothesis.internal.observability import PredicateCounts 

73from hypothesis.reporting import debug_report 

74from hypothesis.utils.conventions import not_set 

75 

76if TYPE_CHECKING: 

77 from typing import TypeAlias 

78 

79 from hypothesis.strategies import SearchStrategy 

80 from hypothesis.strategies._internal.strategies import Ex 

81 

82 

83def __getattr__(name: str) -> Any: 

84 if name == "AVAILABLE_PROVIDERS": 

85 from hypothesis._settings import note_deprecation 

86 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS 

87 

88 note_deprecation( 

89 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to " 

90 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.", 

91 since="2025-01-25", 

92 has_codemod=False, 

93 stacklevel=1, 

94 ) 

95 return AVAILABLE_PROVIDERS 

96 

97 raise AttributeError( 

98 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}" 

99 ) 

100 

101 

102T = TypeVar("T") 

103TargetObservations = dict[str, Union[int, float]] 

104# index, choice_type, constraints, forced value 

105MisalignedAt: "TypeAlias" = tuple[ 

106 int, ChoiceTypeT, ChoiceConstraintsT, Optional[ChoiceT] 

107] 

108 

109TOP_LABEL = calc_label_from_name("top") 

110 

111 

112class ExtraInformation: 

113 """A class for holding shared state on a ``ConjectureData`` that should 

114 be added to the final ``ConjectureResult``.""" 

115 

116 def __repr__(self) -> str: 

117 return "ExtraInformation({})".format( 

118 ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()), 

119 ) 

120 

121 def has_information(self) -> bool: 

122 return bool(self.__dict__) 

123 

124 

125class Status(IntEnum): 

126 OVERRUN = 0 

127 INVALID = 1 

128 VALID = 2 

129 INTERESTING = 3 

130 

131 def __repr__(self) -> str: 

132 return f"Status.{self.name}" 

133 

134 

135@attr.s(slots=True, frozen=True) 

136class StructuralCoverageTag: 

137 label: int = attr.ib() 

138 

139 

140STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {} 

141 

142 

143def structural_coverage(label: int) -> StructuralCoverageTag: 

144 try: 

145 return STRUCTURAL_COVERAGE_CACHE[label] 

146 except KeyError: 

147 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label)) 

148 

149 

150# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for 

151# performance. We lose scan resistance, but that's probably fine here. 

152POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096) 

153 

154 

155class Span: 

156 """A span tracks the hierarchical structure of choices within a single test run. 

157 

158 Spans are created to mark regions of the choice sequence that that are 

159 logically related to each other. For instance, Hypothesis tracks: 

160 - A single top-level span for the entire choice sequence 

161 - A span for the choices made by each strategy 

162 - Some strategies define additional spans within their choices. For instance, 

163 st.lists() tracks the "should add another element" choice and the "add 

164 another element" choices as separate spans. 

165 

166 Spans provide useful information to the shrinker, mutator, targeted PBT, 

167 and other subsystems of Hypothesis. 

168 

169 Rather than store each ``Span`` as a rich object, it is actually 

170 just an index into the ``Spans`` class defined below. This has two 

171 purposes: Firstly, for most properties of spans we will never need 

172 to allocate storage at all, because most properties are not used on 

173 most spans. Secondly, by storing the spans as compact lists 

174 of integers, we save a considerable amount of space compared to 

175 Python's normal object size. 

176 

177 This does have the downside that it increases the amount of allocation 

178 we do, and slows things down as a result, in some usage patterns because 

179 we repeatedly allocate the same Span or int objects, but it will 

180 often dramatically reduce our memory usage, so is worth it. 

181 """ 

182 

183 __slots__ = ("index", "owner") 

184 

185 def __init__(self, owner: "Spans", index: int) -> None: 

186 self.owner = owner 

187 self.index = index 

188 

189 def __eq__(self, other: object) -> bool: 

190 if self is other: 

191 return True 

192 if not isinstance(other, Span): 

193 return NotImplemented 

194 return (self.owner is other.owner) and (self.index == other.index) 

195 

196 def __ne__(self, other: object) -> bool: 

197 if self is other: 

198 return False 

199 if not isinstance(other, Span): 

200 return NotImplemented 

201 return (self.owner is not other.owner) or (self.index != other.index) 

202 

203 def __repr__(self) -> str: 

204 return f"spans[{self.index}]" 

205 

206 @property 

207 def label(self) -> int: 

208 """A label is an opaque value that associates each span with its 

209 approximate origin, such as a particular strategy class or a particular 

210 kind of draw.""" 

211 return self.owner.labels[self.owner.label_indices[self.index]] 

212 

213 @property 

214 def parent(self) -> Optional[int]: 

215 """The index of the span that this one is nested directly within.""" 

216 if self.index == 0: 

217 return None 

218 return self.owner.parentage[self.index] 

219 

220 @property 

221 def start(self) -> int: 

222 return self.owner.starts[self.index] 

223 

224 @property 

225 def end(self) -> int: 

226 return self.owner.ends[self.index] 

227 

228 @property 

229 def depth(self) -> int: 

230 """ 

231 Depth of this span in the span tree. The top-level span has a depth of 0. 

232 """ 

233 return self.owner.depths[self.index] 

234 

235 @property 

236 def discarded(self) -> bool: 

237 """True if this is span's ``stop_span`` call had ``discard`` set to 

238 ``True``. This means we believe that the shrinker should be able to delete 

239 this span completely, without affecting the value produced by its enclosing 

240 strategy. Typically set when a rejection sampler decides to reject a 

241 generated value and try again.""" 

242 return self.index in self.owner.discarded 

243 

244 @property 

245 def choice_count(self) -> int: 

246 """The number of choices in this span.""" 

247 return self.end - self.start 

248 

249 @property 

250 def children(self) -> "list[Span]": 

251 """The list of all spans with this as a parent, in increasing index 

252 order.""" 

253 return [self.owner[i] for i in self.owner.children[self.index]] 

254 

255 

256class SpanProperty: 

257 """There are many properties of spans that we calculate by 

258 essentially rerunning the test case multiple times based on the 

259 calls which we record in SpanProperty. 

260 

261 This class defines a visitor, subclasses of which can be used 

262 to calculate these properties. 

263 """ 

264 

265 def __init__(self, spans: "Spans"): 

266 self.span_stack: list[int] = [] 

267 self.spans = spans 

268 self.span_count = 0 

269 self.choice_count = 0 

270 

271 def run(self) -> Any: 

272 """Rerun the test case with this visitor and return the 

273 results of ``self.finish()``.""" 

274 for record in self.spans.trail: 

275 if record == TrailType.CHOICE: 

276 self.choice_count += 1 

277 elif record >= TrailType.START_SPAN: 

278 self.__push(record - TrailType.START_SPAN) 

279 else: 

280 assert record in ( 

281 TrailType.STOP_SPAN_DISCARD, 

282 TrailType.STOP_SPAN_NO_DISCARD, 

283 ) 

284 self.__pop(discarded=record == TrailType.STOP_SPAN_DISCARD) 

285 return self.finish() 

286 

287 def __push(self, label_index: int) -> None: 

288 i = self.span_count 

289 assert i < len(self.spans) 

290 self.start_span(i, label_index=label_index) 

291 self.span_count += 1 

292 self.span_stack.append(i) 

293 

294 def __pop(self, *, discarded: bool) -> None: 

295 i = self.span_stack.pop() 

296 self.stop_span(i, discarded=discarded) 

297 

298 def start_span(self, i: int, label_index: int) -> None: 

299 """Called at the start of each span, with ``i`` the 

300 index of the span and ``label_index`` the index of 

301 its label in ``self.spans.labels``.""" 

302 

303 def stop_span(self, i: int, *, discarded: bool) -> None: 

304 """Called at the end of each span, with ``i`` the 

305 index of the span and ``discarded`` being ``True`` if ``stop_span`` 

306 was called with ``discard=True``.""" 

307 

308 def finish(self) -> Any: 

309 raise NotImplementedError 

310 

311 

312class TrailType(IntEnum): 

313 STOP_SPAN_DISCARD = 1 

314 STOP_SPAN_NO_DISCARD = 2 

315 START_SPAN = 3 

316 CHOICE = calc_label_from_name("ir draw record") 

317 

318 

319class SpanRecord: 

320 """Records the series of ``start_span``, ``stop_span``, and 

321 ``draw_bits`` calls so that these may be stored in ``Spans`` and 

322 replayed when we need to know about the structure of individual 

323 ``Span`` objects. 

324 

325 Note that there is significant similarity between this class and 

326 ``DataObserver``, and the plan is to eventually unify them, but 

327 they currently have slightly different functions and implementations. 

328 """ 

329 

330 def __init__(self) -> None: 

331 self.labels: list[int] = [] 

332 self.__index_of_labels: Optional[dict[int, int]] = {} 

333 self.trail = IntList() 

334 self.nodes: list[ChoiceNode] = [] 

335 

336 def freeze(self) -> None: 

337 self.__index_of_labels = None 

338 

339 def record_choice(self) -> None: 

340 self.trail.append(TrailType.CHOICE) 

341 

342 def start_span(self, label: int) -> None: 

343 assert self.__index_of_labels is not None 

344 try: 

345 i = self.__index_of_labels[label] 

346 except KeyError: 

347 i = self.__index_of_labels.setdefault(label, len(self.labels)) 

348 self.labels.append(label) 

349 self.trail.append(TrailType.START_SPAN + i) 

350 

351 def stop_span(self, *, discard: bool) -> None: 

352 if discard: 

353 self.trail.append(TrailType.STOP_SPAN_DISCARD) 

354 else: 

355 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD) 

356 

357 

358class _starts_and_ends(SpanProperty): 

359 def __init__(self, spans: "Spans") -> None: 

360 super().__init__(spans) 

361 self.starts = IntList.of_length(len(self.spans)) 

362 self.ends = IntList.of_length(len(self.spans)) 

363 

364 def start_span(self, i: int, label_index: int) -> None: 

365 self.starts[i] = self.choice_count 

366 

367 def stop_span(self, i: int, *, discarded: bool) -> None: 

368 self.ends[i] = self.choice_count 

369 

370 def finish(self) -> tuple[IntList, IntList]: 

371 return (self.starts, self.ends) 

372 

373 

374class _discarded(SpanProperty): 

375 def __init__(self, spans: "Spans") -> None: 

376 super().__init__(spans) 

377 self.result: set[int] = set() 

378 

379 def finish(self) -> frozenset[int]: 

380 return frozenset(self.result) 

381 

382 def stop_span(self, i: int, *, discarded: bool) -> None: 

383 if discarded: 

384 self.result.add(i) 

385 

386 

387class _parentage(SpanProperty): 

388 def __init__(self, spans: "Spans") -> None: 

389 super().__init__(spans) 

390 self.result = IntList.of_length(len(self.spans)) 

391 

392 def stop_span(self, i: int, *, discarded: bool) -> None: 

393 if i > 0: 

394 self.result[i] = self.span_stack[-1] 

395 

396 def finish(self) -> IntList: 

397 return self.result 

398 

399 

400class _depths(SpanProperty): 

401 def __init__(self, spans: "Spans") -> None: 

402 super().__init__(spans) 

403 self.result = IntList.of_length(len(self.spans)) 

404 

405 def start_span(self, i: int, label_index: int) -> None: 

406 self.result[i] = len(self.span_stack) 

407 

408 def finish(self) -> IntList: 

409 return self.result 

410 

411 

412class _label_indices(SpanProperty): 

413 def __init__(self, spans: "Spans") -> None: 

414 super().__init__(spans) 

415 self.result = IntList.of_length(len(self.spans)) 

416 

417 def start_span(self, i: int, label_index: int) -> None: 

418 self.result[i] = label_index 

419 

420 def finish(self) -> IntList: 

421 return self.result 

422 

423 

424class _mutator_groups(SpanProperty): 

425 def __init__(self, spans: "Spans") -> None: 

426 super().__init__(spans) 

427 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set) 

428 

429 def start_span(self, i: int, label_index: int) -> None: 

430 # TODO should we discard start == end cases? occurs for eg st.data() 

431 # which is conditionally or never drawn from. arguably swapping 

432 # nodes with the empty list is a useful mutation enabled by start == end? 

433 key = (self.spans[i].start, self.spans[i].end) 

434 self.groups[label_index].add(key) 

435 

436 def finish(self) -> Iterable[set[tuple[int, int]]]: 

437 # Discard groups with only one span, since the mutator can't 

438 # do anything useful with them. 

439 return [g for g in self.groups.values() if len(g) >= 2] 

440 

441 

442class Spans: 

443 """A lazy collection of ``Span`` objects, derived from 

444 the record of recorded behaviour in ``SpanRecord``. 

445 

446 Behaves logically as if it were a list of ``Span`` objects, 

447 but actually mostly exists as a compact store of information 

448 for them to reference into. All properties on here are best 

449 understood as the backing storage for ``Span`` and are 

450 described there. 

451 """ 

452 

453 def __init__(self, record: SpanRecord) -> None: 

454 self.trail = record.trail 

455 self.labels = record.labels 

456 self.__length = self.trail.count( 

457 TrailType.STOP_SPAN_DISCARD 

458 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD) 

459 self.__children: Optional[list[Sequence[int]]] = None 

460 

461 @cached_property 

462 def starts_and_ends(self) -> tuple[IntList, IntList]: 

463 return _starts_and_ends(self).run() 

464 

465 @property 

466 def starts(self) -> IntList: 

467 return self.starts_and_ends[0] 

468 

469 @property 

470 def ends(self) -> IntList: 

471 return self.starts_and_ends[1] 

472 

473 @cached_property 

474 def discarded(self) -> frozenset[int]: 

475 return _discarded(self).run() 

476 

477 @cached_property 

478 def parentage(self) -> IntList: 

479 return _parentage(self).run() 

480 

481 @cached_property 

482 def depths(self) -> IntList: 

483 return _depths(self).run() 

484 

485 @cached_property 

486 def label_indices(self) -> IntList: 

487 return _label_indices(self).run() 

488 

489 @cached_property 

490 def mutator_groups(self) -> list[set[tuple[int, int]]]: 

491 return _mutator_groups(self).run() 

492 

493 @property 

494 def children(self) -> list[Sequence[int]]: 

495 if self.__children is None: 

496 children = [IntList() for _ in range(len(self))] 

497 for i, p in enumerate(self.parentage): 

498 if i > 0: 

499 children[p].append(i) 

500 # Replace empty children lists with a tuple to reduce 

501 # memory usage. 

502 for i, c in enumerate(children): 

503 if not c: 

504 children[i] = () # type: ignore 

505 self.__children = children # type: ignore 

506 return self.__children # type: ignore 

507 

508 def __len__(self) -> int: 

509 return self.__length 

510 

511 def __getitem__(self, i: int) -> Span: 

512 n = self.__length 

513 if i < -n or i >= n: 

514 raise IndexError(f"Index {i} out of range [-{n}, {n})") 

515 if i < 0: 

516 i += n 

517 return Span(self, i) 

518 

519 # not strictly necessary as we have len/getitem, but required for mypy. 

520 # https://github.com/python/mypy/issues/9737 

521 def __iter__(self) -> Iterator[Span]: 

522 for i in range(len(self)): 

523 yield self[i] 

524 

525 

526class _Overrun: 

527 status: Status = Status.OVERRUN 

528 

529 def __repr__(self) -> str: 

530 return "Overrun" 

531 

532 

533Overrun = _Overrun() 

534 

535global_test_counter = 0 

536 

537 

538MAX_DEPTH = 100 

539 

540 

541class DataObserver: 

542 """Observer class for recording the behaviour of a 

543 ConjectureData object, primarily used for tracking 

544 the behaviour in the tree cache.""" 

545 

546 def conclude_test( 

547 self, 

548 status: Status, 

549 interesting_origin: Optional[InterestingOrigin], 

550 ) -> None: 

551 """Called when ``conclude_test`` is called on the 

552 observed ``ConjectureData``, with the same arguments. 

553 

554 Note that this is called after ``freeze`` has completed. 

555 """ 

556 

557 def kill_branch(self) -> None: 

558 """Mark this part of the tree as not worth re-exploring.""" 

559 

560 def draw_integer( 

561 self, value: int, *, constraints: IntegerConstraints, was_forced: bool 

562 ) -> None: 

563 pass 

564 

565 def draw_float( 

566 self, value: float, *, constraints: FloatConstraints, was_forced: bool 

567 ) -> None: 

568 pass 

569 

570 def draw_string( 

571 self, value: str, *, constraints: StringConstraints, was_forced: bool 

572 ) -> None: 

573 pass 

574 

575 def draw_bytes( 

576 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool 

577 ) -> None: 

578 pass 

579 

580 def draw_boolean( 

581 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool 

582 ) -> None: 

583 pass 

584 

585 

586@attr.s(slots=True) 

587class ConjectureResult: 

588 """Result class storing the parts of ConjectureData that we 

589 will care about after the original ConjectureData has outlived its 

590 usefulness.""" 

591 

592 status: Status = attr.ib() 

593 interesting_origin: Optional[InterestingOrigin] = attr.ib() 

594 nodes: tuple[ChoiceNode, ...] = attr.ib(eq=False, repr=False) 

595 length: int = attr.ib() 

596 output: str = attr.ib() 

597 extra_information: Optional[ExtraInformation] = attr.ib() 

598 expected_exception: Optional[BaseException] = attr.ib() 

599 expected_traceback: Optional[str] = attr.ib() 

600 has_discards: bool = attr.ib() 

601 target_observations: TargetObservations = attr.ib() 

602 tags: frozenset[StructuralCoverageTag] = attr.ib() 

603 spans: Spans = attr.ib(repr=False, eq=False) 

604 arg_slices: set[tuple[int, int]] = attr.ib(repr=False) 

605 slice_comments: dict[tuple[int, int], str] = attr.ib(repr=False) 

606 misaligned_at: Optional[MisalignedAt] = attr.ib(repr=False) 

607 cannot_proceed_scope: Optional[CannotProceedScopeT] = attr.ib(repr=False) 

608 

609 def as_result(self) -> "ConjectureResult": 

610 return self 

611 

612 @property 

613 def choices(self) -> tuple[ChoiceT, ...]: 

614 return tuple(node.value for node in self.nodes) 

615 

616 

617class ConjectureData: 

618 @classmethod 

619 def for_choices( 

620 cls, 

621 choices: Sequence[Union[ChoiceTemplate, ChoiceT]], 

622 *, 

623 observer: Optional[DataObserver] = None, 

624 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

625 random: Optional[Random] = None, 

626 ) -> "ConjectureData": 

627 from hypothesis.internal.conjecture.engine import choice_count 

628 

629 return cls( 

630 max_choices=choice_count(choices), 

631 random=random, 

632 prefix=choices, 

633 observer=observer, 

634 provider=provider, 

635 ) 

636 

637 def __init__( 

638 self, 

639 *, 

640 random: Optional[Random], 

641 observer: Optional[DataObserver] = None, 

642 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

643 prefix: Optional[Sequence[Union[ChoiceTemplate, ChoiceT]]] = None, 

644 max_choices: Optional[int] = None, 

645 provider_kw: Optional[dict[str, Any]] = None, 

646 ) -> None: 

647 from hypothesis.internal.conjecture.engine import BUFFER_SIZE 

648 

649 if observer is None: 

650 observer = DataObserver() 

651 if provider_kw is None: 

652 provider_kw = {} 

653 elif not isinstance(provider, type): 

654 raise InvalidArgument( 

655 f"Expected {provider=} to be a class since {provider_kw=} was " 

656 "passed, but got an instance instead." 

657 ) 

658 

659 assert isinstance(observer, DataObserver) 

660 self.observer = observer 

661 self.max_choices = max_choices 

662 self.max_length = BUFFER_SIZE 

663 self.is_find = False 

664 self.overdraw = 0 

665 self._random = random 

666 

667 self.length = 0 

668 self.index = 0 

669 self.output = "" 

670 self.status = Status.VALID 

671 self.frozen = False 

672 global global_test_counter 

673 self.testcounter = global_test_counter 

674 global_test_counter += 1 

675 self.start_time = time.perf_counter() 

676 self.gc_start_time = gc_cumulative_time() 

677 self.events: dict[str, Union[str, int, float]] = {} 

678 self.interesting_origin: Optional[InterestingOrigin] = None 

679 self.draw_times: dict[str, float] = {} 

680 self._stateful_run_times: dict[str, float] = defaultdict(float) 

681 self.max_depth = 0 

682 self.has_discards = False 

683 

684 self.provider: PrimitiveProvider = ( 

685 provider(self, **provider_kw) if isinstance(provider, type) else provider 

686 ) 

687 assert isinstance(self.provider, PrimitiveProvider) 

688 

689 self.__result: Optional[ConjectureResult] = None 

690 

691 # Observations used for targeted search. They'll be aggregated in 

692 # ConjectureRunner.generate_new_examples and fed to TargetSelector. 

693 self.target_observations: TargetObservations = {} 

694 

695 # Tags which indicate something about which part of the search space 

696 # this example is in. These are used to guide generation. 

697 self.tags: set[StructuralCoverageTag] = set() 

698 self.labels_for_structure_stack: list[set[int]] = [] 

699 

700 # Normally unpopulated but we need this in the niche case 

701 # that self.as_result() is Overrun but we still want the 

702 # examples for reporting purposes. 

703 self.__spans: Optional[Spans] = None 

704 

705 # We want the top level span to have depth 0, so we start 

706 # at -1. 

707 self.depth = -1 

708 self.__span_record = SpanRecord() 

709 

710 # Slice indices for discrete reportable parts that which-parts-matter can 

711 # try varying, to report if the minimal example always fails anyway. 

712 self.arg_slices: set[tuple[int, int]] = set() 

713 self.slice_comments: dict[tuple[int, int], str] = {} 

714 self._observability_args: dict[str, Any] = {} 

715 self._observability_predicates: defaultdict[str, PredicateCounts] = defaultdict( 

716 PredicateCounts 

717 ) 

718 self._sampled_from_all_strategies_elements_message: Optional[ 

719 tuple[str, object] 

720 ] = None 

721 self._shared_strategy_draws: dict[Hashable, tuple[int, Any]] = {} 

722 self.hypothesis_runner = not_set 

723 

724 self.expected_exception: Optional[BaseException] = None 

725 self.expected_traceback: Optional[str] = None 

726 self.extra_information = ExtraInformation() 

727 

728 self.prefix = prefix 

729 self.nodes: tuple[ChoiceNode, ...] = () 

730 self.misaligned_at: Optional[MisalignedAt] = None 

731 self.cannot_proceed_scope: Optional[CannotProceedScopeT] = None 

732 self.start_span(TOP_LABEL) 

733 

734 def __repr__(self) -> str: 

735 return "ConjectureData(%s, %d choices%s)" % ( 

736 self.status.name, 

737 len(self.nodes), 

738 ", frozen" if self.frozen else "", 

739 ) 

740 

741 @property 

742 def choices(self) -> tuple[ChoiceT, ...]: 

743 return tuple(node.value for node in self.nodes) 

744 

745 # draw_* functions might be called in one of two contexts: either "above" or 

746 # "below" the choice sequence. For instance, draw_string calls draw_boolean 

747 # from ``many`` when calculating the number of characters to return. We do 

748 # not want these choices to get written to the choice sequence, because they 

749 # are not true choices themselves. 

750 # 

751 # `observe` formalizes this. The choice will only be written to the choice 

752 # sequence if observe is True. 

753 

754 @overload 

755 def _draw( 

756 self, 

757 choice_type: Literal["integer"], 

758 constraints: IntegerConstraints, 

759 *, 

760 observe: bool, 

761 forced: Optional[int], 

762 ) -> int: ... 

763 

764 @overload 

765 def _draw( 

766 self, 

767 choice_type: Literal["float"], 

768 constraints: FloatConstraints, 

769 *, 

770 observe: bool, 

771 forced: Optional[float], 

772 ) -> float: ... 

773 

774 @overload 

775 def _draw( 

776 self, 

777 choice_type: Literal["string"], 

778 constraints: StringConstraints, 

779 *, 

780 observe: bool, 

781 forced: Optional[str], 

782 ) -> str: ... 

783 

784 @overload 

785 def _draw( 

786 self, 

787 choice_type: Literal["bytes"], 

788 constraints: BytesConstraints, 

789 *, 

790 observe: bool, 

791 forced: Optional[bytes], 

792 ) -> bytes: ... 

793 

794 @overload 

795 def _draw( 

796 self, 

797 choice_type: Literal["boolean"], 

798 constraints: BooleanConstraints, 

799 *, 

800 observe: bool, 

801 forced: Optional[bool], 

802 ) -> bool: ... 

803 

804 def _draw( 

805 self, 

806 choice_type: ChoiceTypeT, 

807 constraints: ChoiceConstraintsT, 

808 *, 

809 observe: bool, 

810 forced: Optional[ChoiceT], 

811 ) -> ChoiceT: 

812 # this is somewhat redundant with the length > max_length check at the 

813 # end of the function, but avoids trying to use a null self.random when 

814 # drawing past the node of a ConjectureData.for_choices data. 

815 if self.length == self.max_length: 

816 debug_report(f"overrun because hit {self.max_length=}") 

817 self.mark_overrun() 

818 if len(self.nodes) == self.max_choices: 

819 debug_report(f"overrun because hit {self.max_choices=}") 

820 self.mark_overrun() 

821 

822 if observe and self.prefix is not None and self.index < len(self.prefix): 

823 value = self._pop_choice(choice_type, constraints, forced=forced) 

824 elif forced is None: 

825 value = getattr(self.provider, f"draw_{choice_type}")(**constraints) 

826 

827 if forced is not None: 

828 value = forced 

829 

830 # nan values generated via int_to_float break list membership: 

831 # 

832 # >>> n = 18444492273895866368 

833 # >>> assert math.isnan(int_to_float(n)) 

834 # >>> assert int_to_float(n) not in [int_to_float(n)] 

835 # 

836 # because int_to_float nans are not equal in the sense of either 

837 # `a == b` or `a is b`. 

838 # 

839 # This can lead to flaky errors when collections require unique 

840 # floats. What was happening is that in some places we provided math.nan 

841 # provide math.nan, and in others we provided 

842 # int_to_float(float_to_int(math.nan)), and which one gets used 

843 # was not deterministic across test iterations. 

844 # 

845 # To fix this, *never* provide a nan value which is equal (via `is`) to 

846 # another provided nan value. This sacrifices some test power; we should 

847 # bring that back (ABOVE the choice sequence layer) in the future. 

848 # 

849 # See https://github.com/HypothesisWorks/hypothesis/issues/3926. 

850 if choice_type == "float": 

851 assert isinstance(value, float) 

852 if math.isnan(value): 

853 value = int_to_float(float_to_int(value)) 

854 

855 if observe: 

856 was_forced = forced is not None 

857 getattr(self.observer, f"draw_{choice_type}")( 

858 value, constraints=constraints, was_forced=was_forced 

859 ) 

860 size = 0 if self.provider.avoid_realization else choices_size([value]) 

861 if self.length + size > self.max_length: 

862 debug_report( 

863 f"overrun because {self.length=} + {size=} > {self.max_length=}" 

864 ) 

865 self.mark_overrun() 

866 

867 node = ChoiceNode( 

868 type=choice_type, 

869 value=value, 

870 constraints=constraints, 

871 was_forced=was_forced, 

872 index=len(self.nodes), 

873 ) 

874 self.__span_record.record_choice() 

875 self.nodes += (node,) 

876 self.length += size 

877 

878 return value 

879 

880 def draw_integer( 

881 self, 

882 min_value: Optional[int] = None, 

883 max_value: Optional[int] = None, 

884 *, 

885 weights: Optional[dict[int, float]] = None, 

886 shrink_towards: int = 0, 

887 forced: Optional[int] = None, 

888 observe: bool = True, 

889 ) -> int: 

890 # Validate arguments 

891 if weights is not None: 

892 assert min_value is not None 

893 assert max_value is not None 

894 assert len(weights) <= 255 # arbitrary practical limit 

895 # We can and should eventually support total weights. But this 

896 # complicates shrinking as we can no longer assume we can force 

897 # a value to the unmapped probability mass if that mass might be 0. 

898 assert sum(weights.values()) < 1 

899 # similarly, things get simpler if we assume every value is possible. 

900 # we'll want to drop this restriction eventually. 

901 assert all(w != 0 for w in weights.values()) 

902 

903 if forced is not None and min_value is not None: 

904 assert min_value <= forced 

905 if forced is not None and max_value is not None: 

906 assert forced <= max_value 

907 

908 constraints: IntegerConstraints = self._pooled_constraints( 

909 "integer", 

910 { 

911 "min_value": min_value, 

912 "max_value": max_value, 

913 "weights": weights, 

914 "shrink_towards": shrink_towards, 

915 }, 

916 ) 

917 return self._draw("integer", constraints, observe=observe, forced=forced) 

918 

919 def draw_float( 

920 self, 

921 min_value: float = -math.inf, 

922 max_value: float = math.inf, 

923 *, 

924 allow_nan: bool = True, 

925 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL, 

926 # TODO: consider supporting these float widths at the choice sequence 

927 # level in the future. 

928 # width: Literal[16, 32, 64] = 64, 

929 forced: Optional[float] = None, 

930 observe: bool = True, 

931 ) -> float: 

932 assert smallest_nonzero_magnitude > 0 

933 assert not math.isnan(min_value) 

934 assert not math.isnan(max_value) 

935 

936 if smallest_nonzero_magnitude == 0.0: # pragma: no cover 

937 raise FloatingPointError( 

938 "Got allow_subnormal=True, but we can't represent subnormal floats " 

939 "right now, in violation of the IEEE-754 floating-point " 

940 "specification. This is usually because something was compiled with " 

941 "-ffast-math or a similar option, which sets global processor state. " 

942 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed " 

943 "writeup - and good luck!" 

944 ) 

945 

946 if forced is not None: 

947 assert allow_nan or not math.isnan(forced) 

948 assert math.isnan(forced) or ( 

949 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value) 

950 ) 

951 

952 constraints: FloatConstraints = self._pooled_constraints( 

953 "float", 

954 { 

955 "min_value": min_value, 

956 "max_value": max_value, 

957 "allow_nan": allow_nan, 

958 "smallest_nonzero_magnitude": smallest_nonzero_magnitude, 

959 }, 

960 ) 

961 return self._draw("float", constraints, observe=observe, forced=forced) 

962 

963 def draw_string( 

964 self, 

965 intervals: IntervalSet, 

966 *, 

967 min_size: int = 0, 

968 max_size: int = COLLECTION_DEFAULT_MAX_SIZE, 

969 forced: Optional[str] = None, 

970 observe: bool = True, 

971 ) -> str: 

972 assert forced is None or min_size <= len(forced) <= max_size 

973 assert min_size >= 0 

974 if len(intervals) == 0: 

975 assert min_size == 0 

976 

977 constraints: StringConstraints = self._pooled_constraints( 

978 "string", 

979 { 

980 "intervals": intervals, 

981 "min_size": min_size, 

982 "max_size": max_size, 

983 }, 

984 ) 

985 return self._draw("string", constraints, observe=observe, forced=forced) 

986 

987 def draw_bytes( 

988 self, 

989 min_size: int = 0, 

990 max_size: int = COLLECTION_DEFAULT_MAX_SIZE, 

991 *, 

992 forced: Optional[bytes] = None, 

993 observe: bool = True, 

994 ) -> bytes: 

995 assert forced is None or min_size <= len(forced) <= max_size 

996 assert min_size >= 0 

997 

998 constraints: BytesConstraints = self._pooled_constraints( 

999 "bytes", {"min_size": min_size, "max_size": max_size} 

1000 ) 

1001 return self._draw("bytes", constraints, observe=observe, forced=forced) 

1002 

1003 def draw_boolean( 

1004 self, 

1005 p: float = 0.5, 

1006 *, 

1007 forced: Optional[bool] = None, 

1008 observe: bool = True, 

1009 ) -> bool: 

1010 assert (forced is not True) or p > 0 

1011 assert (forced is not False) or p < 1 

1012 

1013 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p}) 

1014 return self._draw("boolean", constraints, observe=observe, forced=forced) 

1015 

1016 @overload 

1017 def _pooled_constraints( 

1018 self, choice_type: Literal["integer"], constraints: IntegerConstraints 

1019 ) -> IntegerConstraints: ... 

1020 

1021 @overload 

1022 def _pooled_constraints( 

1023 self, choice_type: Literal["float"], constraints: FloatConstraints 

1024 ) -> FloatConstraints: ... 

1025 

1026 @overload 

1027 def _pooled_constraints( 

1028 self, choice_type: Literal["string"], constraints: StringConstraints 

1029 ) -> StringConstraints: ... 

1030 

1031 @overload 

1032 def _pooled_constraints( 

1033 self, choice_type: Literal["bytes"], constraints: BytesConstraints 

1034 ) -> BytesConstraints: ... 

1035 

1036 @overload 

1037 def _pooled_constraints( 

1038 self, choice_type: Literal["boolean"], constraints: BooleanConstraints 

1039 ) -> BooleanConstraints: ... 

1040 

1041 def _pooled_constraints( 

1042 self, choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT 

1043 ) -> ChoiceConstraintsT: 

1044 """Memoize common dictionary objects to reduce memory pressure.""" 

1045 # caching runs afoul of nondeterminism checks 

1046 if self.provider.avoid_realization: 

1047 return constraints 

1048 

1049 key = (choice_type, *choice_constraints_key(choice_type, constraints)) 

1050 try: 

1051 return POOLED_CONSTRAINTS_CACHE[key] 

1052 except KeyError: 

1053 POOLED_CONSTRAINTS_CACHE[key] = constraints 

1054 return constraints 

1055 

1056 def _pop_choice( 

1057 self, 

1058 choice_type: ChoiceTypeT, 

1059 constraints: ChoiceConstraintsT, 

1060 *, 

1061 forced: Optional[ChoiceT], 

1062 ) -> ChoiceT: 

1063 assert self.prefix is not None 

1064 # checked in _draw 

1065 assert self.index < len(self.prefix) 

1066 

1067 value = self.prefix[self.index] 

1068 if isinstance(value, ChoiceTemplate): 

1069 node: ChoiceTemplate = value 

1070 if node.count is not None: 

1071 assert node.count >= 0 

1072 # node templates have to be at the end for now, since it's not immediately 

1073 # apparent how to handle overruning a node template while generating a single 

1074 # node if the alternative is not "the entire data is an overrun". 

1075 assert self.index == len(self.prefix) - 1 

1076 if node.type == "simplest": 

1077 if forced is not None: 

1078 choice = forced 

1079 try: 

1080 choice = choice_from_index(0, choice_type, constraints) 

1081 except ChoiceTooLarge: 

1082 self.mark_overrun() 

1083 else: 

1084 raise NotImplementedError 

1085 

1086 if node.count is not None: 

1087 node.count -= 1 

1088 if node.count < 0: 

1089 self.mark_overrun() 

1090 return choice 

1091 

1092 choice = value 

1093 node_choice_type = { 

1094 str: "string", 

1095 float: "float", 

1096 int: "integer", 

1097 bool: "boolean", 

1098 bytes: "bytes", 

1099 }[type(choice)] 

1100 # If we're trying to: 

1101 # * draw a different choice type at the same location 

1102 # * draw the same choice type with a different constraints, which does not permit 

1103 # the current value 

1104 # 

1105 # then we call this a misalignment, because the choice sequence has 

1106 # changed from what we expected at some point. An easy misalignment is 

1107 # 

1108 # one_of(integers(0, 100), integers(101, 200)) 

1109 # 

1110 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100} 

1111 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at 

1112 # index 1 (which does not permit any of the values 0-100). 

1113 # 

1114 # When the choice sequence becomes misaligned, we generate a new value of the 

1115 # type and constraints the strategy expects. 

1116 if node_choice_type != choice_type or not choice_permitted(choice, constraints): 

1117 # only track first misalignment for now. 

1118 if self.misaligned_at is None: 

1119 self.misaligned_at = (self.index, choice_type, constraints, forced) 

1120 try: 

1121 # Fill in any misalignments with index 0 choices. An alternative to 

1122 # this is using the index of the misaligned choice instead 

1123 # of index 0, which may be useful for maintaining 

1124 # "similarly-complex choices" in the shrinker. This requires 

1125 # attaching an index to every choice in ConjectureData.for_choices, 

1126 # which we don't always have (e.g. when reading from db). 

1127 # 

1128 # If we really wanted this in the future we could make this complexity 

1129 # optional, use it if present, and default to index 0 otherwise. 

1130 # This complicates our internal api and so I'd like to avoid it 

1131 # if possible. 

1132 # 

1133 # Additionally, I don't think slips which require 

1134 # slipping to high-complexity values are common. Though arguably 

1135 # we may want to expand a bit beyond *just* the simplest choice. 

1136 # (we could for example consider sampling choices from index 0-10). 

1137 choice = choice_from_index(0, choice_type, constraints) 

1138 except ChoiceTooLarge: 

1139 # should really never happen with a 0-index choice, but let's be safe. 

1140 self.mark_overrun() 

1141 

1142 self.index += 1 

1143 return choice 

1144 

1145 def as_result(self) -> Union[ConjectureResult, _Overrun]: 

1146 """Convert the result of running this test into 

1147 either an Overrun object or a ConjectureResult.""" 

1148 

1149 assert self.frozen 

1150 if self.status == Status.OVERRUN: 

1151 return Overrun 

1152 if self.__result is None: 

1153 self.__result = ConjectureResult( 

1154 status=self.status, 

1155 interesting_origin=self.interesting_origin, 

1156 spans=self.spans, 

1157 nodes=self.nodes, 

1158 length=self.length, 

1159 output=self.output, 

1160 expected_traceback=self.expected_traceback, 

1161 expected_exception=self.expected_exception, 

1162 extra_information=( 

1163 self.extra_information 

1164 if self.extra_information.has_information() 

1165 else None 

1166 ), 

1167 has_discards=self.has_discards, 

1168 target_observations=self.target_observations, 

1169 tags=frozenset(self.tags), 

1170 arg_slices=self.arg_slices, 

1171 slice_comments=self.slice_comments, 

1172 misaligned_at=self.misaligned_at, 

1173 cannot_proceed_scope=self.cannot_proceed_scope, 

1174 ) 

1175 assert self.__result is not None 

1176 return self.__result 

1177 

1178 def __assert_not_frozen(self, name: str) -> None: 

1179 if self.frozen: 

1180 raise Frozen(f"Cannot call {name} on frozen ConjectureData") 

1181 

1182 def note(self, value: Any) -> None: 

1183 self.__assert_not_frozen("note") 

1184 if not isinstance(value, str): 

1185 value = repr(value) 

1186 self.output += value 

1187 

1188 def draw( 

1189 self, 

1190 strategy: "SearchStrategy[Ex]", 

1191 label: Optional[int] = None, 

1192 observe_as: Optional[str] = None, 

1193 ) -> "Ex": 

1194 from hypothesis.internal.observability import TESTCASE_CALLBACKS 

1195 from hypothesis.strategies._internal.lazy import unwrap_strategies 

1196 from hypothesis.strategies._internal.utils import to_jsonable 

1197 

1198 if self.is_find and not strategy.supports_find: 

1199 raise InvalidArgument( 

1200 f"Cannot use strategy {strategy!r} within a call to find " 

1201 "(presumably because it would be invalid after the call had ended)." 

1202 ) 

1203 

1204 at_top_level = self.depth == 0 

1205 start_time = None 

1206 if at_top_level: 

1207 # We start this timer early, because accessing attributes on a LazyStrategy 

1208 # can be almost arbitrarily slow. In cases like characters() and text() 

1209 # where we cache something expensive, this led to Flaky deadline errors! 

1210 # See https://github.com/HypothesisWorks/hypothesis/issues/2108 

1211 start_time = time.perf_counter() 

1212 gc_start_time = gc_cumulative_time() 

1213 

1214 strategy.validate() 

1215 

1216 if strategy.is_empty: 

1217 self.mark_invalid(f"empty strategy {self!r}") 

1218 

1219 if self.depth >= MAX_DEPTH: 

1220 self.mark_invalid("max depth exceeded") 

1221 

1222 # Jump directly to the unwrapped strategy for the label and for do_draw. 

1223 # This avoids adding an extra span to all lazy strategies. 

1224 unwrapped = unwrap_strategies(strategy) 

1225 if label is None: 

1226 label = unwrapped.label 

1227 assert isinstance(label, int) 

1228 

1229 self.start_span(label=label) 

1230 try: 

1231 if not at_top_level: 

1232 return unwrapped.do_draw(self) 

1233 assert start_time is not None 

1234 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}" 

1235 try: 

1236 try: 

1237 v = unwrapped.do_draw(self) 

1238 finally: 

1239 # Subtract the time spent in GC to avoid overcounting, as it is 

1240 # accounted for at the overall example level. 

1241 in_gctime = gc_cumulative_time() - gc_start_time 

1242 self.draw_times[key] = time.perf_counter() - start_time - in_gctime 

1243 except Exception as err: 

1244 add_note( 

1245 err, 

1246 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}", 

1247 ) 

1248 raise 

1249 if TESTCASE_CALLBACKS: 

1250 avoid = self.provider.avoid_realization 

1251 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid) 

1252 return v 

1253 finally: 

1254 self.stop_span() 

1255 

1256 def start_span(self, label: int) -> None: 

1257 self.provider.span_start(label) 

1258 self.__assert_not_frozen("start_span") 

1259 self.depth += 1 

1260 # Logically it would make sense for this to just be 

1261 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to 

1262 # be until we ran the code under tracemalloc and found a rather significant 

1263 # chunk of allocation was happening here. This was presumably due to varargs 

1264 # or the like, but we didn't investigate further given that it was easy 

1265 # to fix with this check. 

1266 if self.depth > self.max_depth: 

1267 self.max_depth = self.depth 

1268 self.__span_record.start_span(label) 

1269 self.labels_for_structure_stack.append({label}) 

1270 

1271 def stop_span(self, *, discard: bool = False) -> None: 

1272 self.provider.span_end(discard) 

1273 if self.frozen: 

1274 return 

1275 if discard: 

1276 self.has_discards = True 

1277 self.depth -= 1 

1278 assert self.depth >= -1 

1279 self.__span_record.stop_span(discard=discard) 

1280 

1281 labels_for_structure = self.labels_for_structure_stack.pop() 

1282 

1283 if not discard: 

1284 if self.labels_for_structure_stack: 

1285 self.labels_for_structure_stack[-1].update(labels_for_structure) 

1286 else: 

1287 self.tags.update([structural_coverage(l) for l in labels_for_structure]) 

1288 

1289 if discard: 

1290 # Once we've discarded a span, every test case starting with 

1291 # this prefix contains discards. We prune the tree at that point so 

1292 # as to avoid future test cases bothering with this region, on the 

1293 # assumption that some span that you could have used instead 

1294 # there would *not* trigger the discard. This greatly speeds up 

1295 # test case generation in some cases, because it allows us to 

1296 # ignore large swathes of the search space that are effectively 

1297 # redundant. 

1298 # 

1299 # A scenario that can cause us problems but which we deliberately 

1300 # have decided not to support is that if there are side effects 

1301 # during data generation then you may end up with a scenario where 

1302 # every good test case generates a discard because the discarded 

1303 # section sets up important things for later. This is not terribly 

1304 # likely and all that you see in this case is some degradation in 

1305 # quality of testing, so we don't worry about it. 

1306 # 

1307 # Note that killing the branch does *not* mean we will never 

1308 # explore below this point, and in particular we may do so during 

1309 # shrinking. Any explicit request for a data object that starts 

1310 # with the branch here will work just fine, but novel prefix 

1311 # generation will avoid it, and we can use it to detect when we 

1312 # have explored the entire tree (up to redundancy). 

1313 

1314 self.observer.kill_branch() 

1315 

1316 @property 

1317 def spans(self) -> Spans: 

1318 assert self.frozen 

1319 if self.__spans is None: 

1320 self.__spans = Spans(record=self.__span_record) 

1321 return self.__spans 

1322 

1323 def freeze(self) -> None: 

1324 if self.frozen: 

1325 return 

1326 self.finish_time = time.perf_counter() 

1327 self.gc_finish_time = gc_cumulative_time() 

1328 

1329 # Always finish by closing all remaining spans so that we have a valid tree. 

1330 while self.depth >= 0: 

1331 self.stop_span() 

1332 

1333 self.__span_record.freeze() 

1334 self.frozen = True 

1335 self.observer.conclude_test(self.status, self.interesting_origin) 

1336 

1337 def choice( 

1338 self, 

1339 values: Sequence[T], 

1340 *, 

1341 forced: Optional[T] = None, 

1342 observe: bool = True, 

1343 ) -> T: 

1344 forced_i = None if forced is None else values.index(forced) 

1345 i = self.draw_integer( 

1346 0, 

1347 len(values) - 1, 

1348 forced=forced_i, 

1349 observe=observe, 

1350 ) 

1351 return values[i] 

1352 

1353 def conclude_test( 

1354 self, 

1355 status: Status, 

1356 interesting_origin: Optional[InterestingOrigin] = None, 

1357 ) -> NoReturn: 

1358 assert (interesting_origin is None) or (status == Status.INTERESTING) 

1359 self.__assert_not_frozen("conclude_test") 

1360 self.interesting_origin = interesting_origin 

1361 self.status = status 

1362 self.freeze() 

1363 raise StopTest(self.testcounter) 

1364 

1365 def mark_interesting( 

1366 self, interesting_origin: Optional[InterestingOrigin] = None 

1367 ) -> NoReturn: 

1368 self.conclude_test(Status.INTERESTING, interesting_origin) 

1369 

1370 def mark_invalid(self, why: Optional[str] = None) -> NoReturn: 

1371 if why is not None: 

1372 self.events["invalid because"] = why 

1373 self.conclude_test(Status.INVALID) 

1374 

1375 def mark_overrun(self) -> NoReturn: 

1376 self.conclude_test(Status.OVERRUN) 

1377 

1378 

1379def draw_choice( 

1380 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT, *, random: Random 

1381) -> ChoiceT: 

1382 cd = ConjectureData(random=random) 

1383 return cast(ChoiceT, getattr(cd.provider, f"draw_{choice_type}")(**constraints))