Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/data.py: 67%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

618 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import math 

12import time 

13from collections import defaultdict 

14from collections.abc import Hashable, Iterable, Iterator, Sequence 

15from dataclasses import dataclass, field 

16from enum import IntEnum 

17from functools import cached_property 

18from random import Random 

19from typing import ( 

20 TYPE_CHECKING, 

21 Any, 

22 Literal, 

23 NoReturn, 

24 TypeAlias, 

25 TypeVar, 

26 cast, 

27 overload, 

28) 

29 

30from hypothesis.errors import ( 

31 CannotProceedScopeT, 

32 ChoiceTooLarge, 

33 Frozen, 

34 InvalidArgument, 

35 StopTest, 

36) 

37from hypothesis.internal.cache import LRUCache 

38from hypothesis.internal.compat import add_note 

39from hypothesis.internal.conjecture.choice import ( 

40 BooleanConstraints, 

41 BytesConstraints, 

42 ChoiceConstraintsT, 

43 ChoiceNode, 

44 ChoiceT, 

45 ChoiceTemplate, 

46 ChoiceTypeT, 

47 FloatConstraints, 

48 IntegerConstraints, 

49 StringConstraints, 

50 choice_constraints_key, 

51 choice_from_index, 

52 choice_permitted, 

53 choices_size, 

54) 

55from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time 

56from hypothesis.internal.conjecture.providers import ( 

57 COLLECTION_DEFAULT_MAX_SIZE, 

58 HypothesisProvider, 

59 PrimitiveProvider, 

60) 

61from hypothesis.internal.conjecture.utils import calc_label_from_name 

62from hypothesis.internal.escalation import InterestingOrigin 

63from hypothesis.internal.floats import ( 

64 SMALLEST_SUBNORMAL, 

65 float_to_int, 

66 int_to_float, 

67 sign_aware_lte, 

68) 

69from hypothesis.internal.intervalsets import IntervalSet 

70from hypothesis.internal.observability import PredicateCounts 

71from hypothesis.reporting import debug_report 

72from hypothesis.utils.conventions import not_set 

73from hypothesis.utils.deprecation import note_deprecation 

74from hypothesis.utils.threading import ThreadLocal 

75 

76if TYPE_CHECKING: 

77 from hypothesis.strategies import SearchStrategy 

78 from hypothesis.strategies._internal.core import DataObject 

79 from hypothesis.strategies._internal.random import RandomState 

80 from hypothesis.strategies._internal.strategies import Ex 

81 

82 

83def __getattr__(name: str) -> Any: 

84 if name == "AVAILABLE_PROVIDERS": 

85 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS 

86 

87 note_deprecation( 

88 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to " 

89 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.", 

90 since="2025-01-25", 

91 has_codemod=False, 

92 stacklevel=1, 

93 ) 

94 return AVAILABLE_PROVIDERS 

95 

96 raise AttributeError( 

97 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}" 

98 ) 

99 

100 

101T = TypeVar("T") 

102TargetObservations = dict[str, int | float] 

103# index, choice_type, constraints, forced value 

104MisalignedAt: TypeAlias = tuple[int, ChoiceTypeT, ChoiceConstraintsT, ChoiceT | None] 

105 

106TOP_LABEL = calc_label_from_name("top") 

107MAX_DEPTH = 100 

108 

109threadlocal = ThreadLocal(global_test_counter=int) 

110 

111 

112class Status(IntEnum): 

113 OVERRUN = 0 

114 INVALID = 1 

115 VALID = 2 

116 INTERESTING = 3 

117 

118 def __repr__(self) -> str: 

119 return f"Status.{self.name}" 

120 

121 

122@dataclass(slots=True, frozen=True) 

123class StructuralCoverageTag: 

124 label: int 

125 

126 

127STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {} 

128 

129 

130def structural_coverage(label: int) -> StructuralCoverageTag: 

131 try: 

132 return STRUCTURAL_COVERAGE_CACHE[label] 

133 except KeyError: 

134 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label)) 

135 

136 

137# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for 

138# performance. We lose scan resistance, but that's probably fine here. 

139POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096) 

140 

141 

142class Span: 

143 """A span tracks the hierarchical structure of choices within a single test run. 

144 

145 Spans are created to mark regions of the choice sequence that are 

146 logically related to each other. For instance, Hypothesis tracks: 

147 - A single top-level span for the entire choice sequence 

148 - A span for the choices made by each strategy 

149 - Some strategies define additional spans within their choices. For instance, 

150 st.lists() tracks the "should add another element" choice and the "add 

151 another element" choices as separate spans. 

152 

153 Spans provide useful information to the shrinker, mutator, targeted PBT, 

154 and other subsystems of Hypothesis. 

155 

156 Rather than store each ``Span`` as a rich object, it is actually 

157 just an index into the ``Spans`` class defined below. This has two 

158 purposes: Firstly, for most properties of spans we will never need 

159 to allocate storage at all, because most properties are not used on 

160 most spans. Secondly, by storing the spans as compact lists 

161 of integers, we save a considerable amount of space compared to 

162 Python's normal object size. 

163 

164 This does have the downside that it increases the amount of allocation 

165 we do, and slows things down as a result, in some usage patterns because 

166 we repeatedly allocate the same Span or int objects, but it will 

167 often dramatically reduce our memory usage, so is worth it. 

168 """ 

169 

170 __slots__ = ("index", "owner") 

171 

172 def __init__(self, owner: "Spans", index: int) -> None: 

173 self.owner = owner 

174 self.index = index 

175 

176 def __eq__(self, other: object) -> bool: 

177 if self is other: 

178 return True 

179 if not isinstance(other, Span): 

180 return NotImplemented 

181 return (self.owner is other.owner) and (self.index == other.index) 

182 

183 def __ne__(self, other: object) -> bool: 

184 if self is other: 

185 return False 

186 if not isinstance(other, Span): 

187 return NotImplemented 

188 return (self.owner is not other.owner) or (self.index != other.index) 

189 

190 def __repr__(self) -> str: 

191 return f"spans[{self.index}]" 

192 

193 @property 

194 def label(self) -> int: 

195 """A label is an opaque value that associates each span with its 

196 approximate origin, such as a particular strategy class or a particular 

197 kind of draw.""" 

198 return self.owner.labels[self.owner.label_indices[self.index]] 

199 

200 @property 

201 def parent(self) -> int | None: 

202 """The index of the span that this one is nested directly within.""" 

203 if self.index == 0: 

204 return None 

205 return self.owner.parentage[self.index] 

206 

207 @property 

208 def start(self) -> int: 

209 return self.owner.starts[self.index] 

210 

211 @property 

212 def end(self) -> int: 

213 return self.owner.ends[self.index] 

214 

215 @property 

216 def depth(self) -> int: 

217 """ 

218 Depth of this span in the span tree. The top-level span has a depth of 0. 

219 """ 

220 return self.owner.depths[self.index] 

221 

222 @property 

223 def discarded(self) -> bool: 

224 """True if this is span's ``stop_span`` call had ``discard`` set to 

225 ``True``. This means we believe that the shrinker should be able to delete 

226 this span completely, without affecting the value produced by its enclosing 

227 strategy. Typically set when a rejection sampler decides to reject a 

228 generated value and try again.""" 

229 return self.index in self.owner.discarded 

230 

231 @property 

232 def choice_count(self) -> int: 

233 """The number of choices in this span.""" 

234 return self.end - self.start 

235 

236 @property 

237 def children(self) -> "list[Span]": 

238 """The list of all spans with this as a parent, in increasing index 

239 order.""" 

240 return [self.owner[i] for i in self.owner.children[self.index]] 

241 

242 

243class SpanProperty: 

244 """There are many properties of spans that we calculate by 

245 essentially rerunning the test case multiple times based on the 

246 calls which we record in SpanProperty. 

247 

248 This class defines a visitor, subclasses of which can be used 

249 to calculate these properties. 

250 """ 

251 

252 def __init__(self, spans: "Spans"): 

253 self.span_stack: list[int] = [] 

254 self.spans = spans 

255 self.span_count = 0 

256 self.choice_count = 0 

257 

258 def run(self) -> Any: 

259 """Rerun the test case with this visitor and return the 

260 results of ``self.finish()``.""" 

261 for record in self.spans.trail: 

262 if record == TrailType.STOP_SPAN_DISCARD: 

263 self.__pop(discarded=True) 

264 elif record == TrailType.STOP_SPAN_NO_DISCARD: 

265 self.__pop(discarded=False) 

266 elif record == TrailType.CHOICE: 

267 self.choice_count += 1 

268 else: 

269 # everything after TrailType.CHOICE is the label of a span start. 

270 self.__push(record - TrailType.CHOICE - 1) 

271 

272 return self.finish() 

273 

274 def __push(self, label_index: int) -> None: 

275 i = self.span_count 

276 assert i < len(self.spans) 

277 self.start_span(i, label_index=label_index) 

278 self.span_count += 1 

279 self.span_stack.append(i) 

280 

281 def __pop(self, *, discarded: bool) -> None: 

282 i = self.span_stack.pop() 

283 self.stop_span(i, discarded=discarded) 

284 

285 def start_span(self, i: int, label_index: int) -> None: 

286 """Called at the start of each span, with ``i`` the 

287 index of the span and ``label_index`` the index of 

288 its label in ``self.spans.labels``.""" 

289 

290 def stop_span(self, i: int, *, discarded: bool) -> None: 

291 """Called at the end of each span, with ``i`` the 

292 index of the span and ``discarded`` being ``True`` if ``stop_span`` 

293 was called with ``discard=True``.""" 

294 

295 def finish(self) -> Any: 

296 raise NotImplementedError 

297 

298 

299class TrailType(IntEnum): 

300 STOP_SPAN_DISCARD = 1 

301 STOP_SPAN_NO_DISCARD = 2 

302 CHOICE = 3 

303 # every trail element larger than TrailType.CHOICE is the label of a span 

304 # start, offset by its index. So the first span label is stored as 4, the 

305 # second as 5, etc, regardless of its actual integer label. 

306 

307 

308class SpanRecord: 

309 """Records the series of ``start_span``, ``stop_span``, and 

310 ``draw_bits`` calls so that these may be stored in ``Spans`` and 

311 replayed when we need to know about the structure of individual 

312 ``Span`` objects. 

313 

314 Note that there is significant similarity between this class and 

315 ``DataObserver``, and the plan is to eventually unify them, but 

316 they currently have slightly different functions and implementations. 

317 """ 

318 

319 def __init__(self) -> None: 

320 self.labels: list[int] = [] 

321 self.__index_of_labels: dict[int, int] | None = {} 

322 self.trail = IntList() 

323 self.nodes: list[ChoiceNode] = [] 

324 

325 def freeze(self) -> None: 

326 self.__index_of_labels = None 

327 

328 def record_choice(self) -> None: 

329 self.trail.append(TrailType.CHOICE) 

330 

331 def start_span(self, label: int) -> None: 

332 assert self.__index_of_labels is not None 

333 try: 

334 i = self.__index_of_labels[label] 

335 except KeyError: 

336 i = self.__index_of_labels.setdefault(label, len(self.labels)) 

337 self.labels.append(label) 

338 self.trail.append(TrailType.CHOICE + 1 + i) 

339 

340 def stop_span(self, *, discard: bool) -> None: 

341 if discard: 

342 self.trail.append(TrailType.STOP_SPAN_DISCARD) 

343 else: 

344 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD) 

345 

346 

347class _starts_and_ends(SpanProperty): 

348 def __init__(self, spans: "Spans") -> None: 

349 super().__init__(spans) 

350 self.starts = IntList.of_length(len(self.spans)) 

351 self.ends = IntList.of_length(len(self.spans)) 

352 

353 def start_span(self, i: int, label_index: int) -> None: 

354 self.starts[i] = self.choice_count 

355 

356 def stop_span(self, i: int, *, discarded: bool) -> None: 

357 self.ends[i] = self.choice_count 

358 

359 def finish(self) -> tuple[IntList, IntList]: 

360 return (self.starts, self.ends) 

361 

362 

363class _discarded(SpanProperty): 

364 def __init__(self, spans: "Spans") -> None: 

365 super().__init__(spans) 

366 self.result: set[int] = set() 

367 

368 def finish(self) -> frozenset[int]: 

369 return frozenset(self.result) 

370 

371 def stop_span(self, i: int, *, discarded: bool) -> None: 

372 if discarded: 

373 self.result.add(i) 

374 

375 

376class _parentage(SpanProperty): 

377 def __init__(self, spans: "Spans") -> None: 

378 super().__init__(spans) 

379 self.result = IntList.of_length(len(self.spans)) 

380 

381 def stop_span(self, i: int, *, discarded: bool) -> None: 

382 if i > 0: 

383 self.result[i] = self.span_stack[-1] 

384 

385 def finish(self) -> IntList: 

386 return self.result 

387 

388 

389class _depths(SpanProperty): 

390 def __init__(self, spans: "Spans") -> None: 

391 super().__init__(spans) 

392 self.result = IntList.of_length(len(self.spans)) 

393 

394 def start_span(self, i: int, label_index: int) -> None: 

395 self.result[i] = len(self.span_stack) 

396 

397 def finish(self) -> IntList: 

398 return self.result 

399 

400 

401class _label_indices(SpanProperty): 

402 def __init__(self, spans: "Spans") -> None: 

403 super().__init__(spans) 

404 self.result = IntList.of_length(len(self.spans)) 

405 

406 def start_span(self, i: int, label_index: int) -> None: 

407 self.result[i] = label_index 

408 

409 def finish(self) -> IntList: 

410 return self.result 

411 

412 

413class _mutator_groups(SpanProperty): 

414 def __init__(self, spans: "Spans") -> None: 

415 super().__init__(spans) 

416 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set) 

417 

418 def start_span(self, i: int, label_index: int) -> None: 

419 # TODO should we discard start == end cases? occurs for eg st.data() 

420 # which is conditionally or never drawn from. arguably swapping 

421 # nodes with the empty list is a useful mutation enabled by start == end? 

422 key = (self.spans[i].start, self.spans[i].end) 

423 self.groups[label_index].add(key) 

424 

425 def finish(self) -> Iterable[set[tuple[int, int]]]: 

426 # Discard groups with only one span, since the mutator can't 

427 # do anything useful with them. 

428 return [g for g in self.groups.values() if len(g) >= 2] 

429 

430 

431class Spans: 

432 """A lazy collection of ``Span`` objects, derived from 

433 the record of recorded behaviour in ``SpanRecord``. 

434 

435 Behaves logically as if it were a list of ``Span`` objects, 

436 but actually mostly exists as a compact store of information 

437 for them to reference into. All properties on here are best 

438 understood as the backing storage for ``Span`` and are 

439 described there. 

440 """ 

441 

442 def __init__(self, record: SpanRecord) -> None: 

443 self.trail = record.trail 

444 self.labels = record.labels 

445 self.__length = self.trail.count( 

446 TrailType.STOP_SPAN_DISCARD 

447 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD) 

448 self.__children: list[Sequence[int]] | None = None 

449 

450 @cached_property 

451 def starts_and_ends(self) -> tuple[IntList, IntList]: 

452 return _starts_and_ends(self).run() 

453 

454 @property 

455 def starts(self) -> IntList: 

456 return self.starts_and_ends[0] 

457 

458 @property 

459 def ends(self) -> IntList: 

460 return self.starts_and_ends[1] 

461 

462 @cached_property 

463 def discarded(self) -> frozenset[int]: 

464 return _discarded(self).run() 

465 

466 @cached_property 

467 def parentage(self) -> IntList: 

468 return _parentage(self).run() 

469 

470 @cached_property 

471 def depths(self) -> IntList: 

472 return _depths(self).run() 

473 

474 @cached_property 

475 def label_indices(self) -> IntList: 

476 return _label_indices(self).run() 

477 

478 @cached_property 

479 def mutator_groups(self) -> list[set[tuple[int, int]]]: 

480 return _mutator_groups(self).run() 

481 

482 @property 

483 def children(self) -> list[Sequence[int]]: 

484 if self.__children is None: 

485 children = [IntList() for _ in range(len(self))] 

486 for i, p in enumerate(self.parentage): 

487 if i > 0: 

488 children[p].append(i) 

489 # Replace empty children lists with a tuple to reduce 

490 # memory usage. 

491 for i, c in enumerate(children): 

492 if not c: 

493 children[i] = () # type: ignore 

494 self.__children = children # type: ignore 

495 return self.__children # type: ignore 

496 

497 def __len__(self) -> int: 

498 return self.__length 

499 

500 def __getitem__(self, i: int) -> Span: 

501 n = self.__length 

502 if i < -n or i >= n: 

503 raise IndexError(f"Index {i} out of range [-{n}, {n})") 

504 if i < 0: 

505 i += n 

506 return Span(self, i) 

507 

508 # not strictly necessary as we have len/getitem, but required for mypy. 

509 # https://github.com/python/mypy/issues/9737 

510 def __iter__(self) -> Iterator[Span]: 

511 for i in range(len(self)): 

512 yield self[i] 

513 

514 

515class _Overrun: 

516 status: Status = Status.OVERRUN 

517 

518 def __repr__(self) -> str: 

519 return "Overrun" 

520 

521 

522Overrun = _Overrun() 

523 

524 

525class DataObserver: 

526 """Observer class for recording the behaviour of a 

527 ConjectureData object, primarily used for tracking 

528 the behaviour in the tree cache.""" 

529 

530 def conclude_test( 

531 self, 

532 status: Status, 

533 interesting_origin: InterestingOrigin | None, 

534 ) -> None: 

535 """Called when ``conclude_test`` is called on the 

536 observed ``ConjectureData``, with the same arguments. 

537 

538 Note that this is called after ``freeze`` has completed. 

539 """ 

540 

541 def kill_branch(self) -> None: 

542 """Mark this part of the tree as not worth re-exploring.""" 

543 

544 def draw_integer( 

545 self, value: int, *, constraints: IntegerConstraints, was_forced: bool 

546 ) -> None: 

547 pass 

548 

549 def draw_float( 

550 self, value: float, *, constraints: FloatConstraints, was_forced: bool 

551 ) -> None: 

552 pass 

553 

554 def draw_string( 

555 self, value: str, *, constraints: StringConstraints, was_forced: bool 

556 ) -> None: 

557 pass 

558 

559 def draw_bytes( 

560 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool 

561 ) -> None: 

562 pass 

563 

564 def draw_boolean( 

565 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool 

566 ) -> None: 

567 pass 

568 

569 

570@dataclass(slots=True, frozen=True) 

571class ConjectureResult: 

572 """Result class storing the parts of ConjectureData that we 

573 will care about after the original ConjectureData has outlived its 

574 usefulness.""" 

575 

576 status: Status 

577 interesting_origin: InterestingOrigin | None 

578 nodes: tuple[ChoiceNode, ...] = field(repr=False, compare=False) 

579 length: int 

580 output: str 

581 expected_exception: BaseException | None 

582 expected_traceback: str | None 

583 has_discards: bool 

584 target_observations: TargetObservations 

585 tags: frozenset[StructuralCoverageTag] 

586 spans: Spans = field(repr=False, compare=False) 

587 arg_slices: set[tuple[int, int]] = field(repr=False) 

588 slice_comments: dict[tuple[int, int], str] = field(repr=False) 

589 misaligned_at: MisalignedAt | None = field(repr=False) 

590 cannot_proceed_scope: CannotProceedScopeT | None = field(repr=False) 

591 

592 def as_result(self) -> "ConjectureResult": 

593 return self 

594 

595 @property 

596 def choices(self) -> tuple[ChoiceT, ...]: 

597 return tuple(node.value for node in self.nodes) 

598 

599 

600class ConjectureData: 

601 @classmethod 

602 def for_choices( 

603 cls, 

604 choices: Sequence[ChoiceTemplate | ChoiceT], 

605 *, 

606 observer: DataObserver | None = None, 

607 provider: PrimitiveProvider | type[PrimitiveProvider] = HypothesisProvider, 

608 random: Random | None = None, 

609 ) -> "ConjectureData": 

610 from hypothesis.internal.conjecture.engine import choice_count 

611 

612 return cls( 

613 max_choices=choice_count(choices), 

614 random=random, 

615 prefix=choices, 

616 observer=observer, 

617 provider=provider, 

618 ) 

619 

620 def __init__( 

621 self, 

622 *, 

623 random: Random | None, 

624 observer: DataObserver | None = None, 

625 provider: PrimitiveProvider | type[PrimitiveProvider] = HypothesisProvider, 

626 prefix: Sequence[ChoiceTemplate | ChoiceT] | None = None, 

627 max_choices: int | None = None, 

628 provider_kw: dict[str, Any] | None = None, 

629 ) -> None: 

630 from hypothesis.internal.conjecture.engine import BUFFER_SIZE 

631 

632 if observer is None: 

633 observer = DataObserver() 

634 if provider_kw is None: 

635 provider_kw = {} 

636 elif not isinstance(provider, type): 

637 raise InvalidArgument( 

638 f"Expected {provider=} to be a class since {provider_kw=} was " 

639 "passed, but got an instance instead." 

640 ) 

641 

642 assert isinstance(observer, DataObserver) 

643 self.observer = observer 

644 self.max_choices = max_choices 

645 self.max_length = BUFFER_SIZE 

646 self.overdraw = 0 

647 self._random = random 

648 

649 self.length: int = 0 

650 self.index: int = 0 

651 self.output: str = "" 

652 self.status: Status = Status.VALID 

653 self.frozen: bool = False 

654 self.testcounter: int = threadlocal.global_test_counter 

655 threadlocal.global_test_counter += 1 

656 self.start_time = time.perf_counter() 

657 self.gc_start_time = gc_cumulative_time() 

658 self.events: dict[str, str | int | float] = {} 

659 self.interesting_origin: InterestingOrigin | None = None 

660 self.draw_times: dict[str, float] = {} 

661 self._stateful_run_times: dict[str, float] = defaultdict(float) 

662 self.max_depth: int = 0 

663 self.has_discards: bool = False 

664 

665 self.provider: PrimitiveProvider = ( 

666 provider(self, **provider_kw) if isinstance(provider, type) else provider 

667 ) 

668 assert isinstance(self.provider, PrimitiveProvider) 

669 

670 self.__result: ConjectureResult | None = None 

671 

672 # Observations used for targeted search. They'll be aggregated in 

673 # ConjectureRunner.generate_new_examples and fed to TargetSelector. 

674 self.target_observations: TargetObservations = {} 

675 

676 # Tags which indicate something about which part of the search space 

677 # this example is in. These are used to guide generation. 

678 self.tags: set[StructuralCoverageTag] = set() 

679 self.labels_for_structure_stack: list[set[int]] = [] 

680 

681 # Normally unpopulated but we need this in the niche case 

682 # that self.as_result() is Overrun but we still want the 

683 # examples for reporting purposes. 

684 self.__spans: Spans | None = None 

685 

686 # We want the top level span to have depth 0, so we start at -1. 

687 self.depth: int = -1 

688 self.__span_record = SpanRecord() 

689 

690 # Slice indices for discrete reportable parts that which-parts-matter can 

691 # try varying, to report if the minimal example always fails anyway. 

692 self.arg_slices: set[tuple[int, int]] = set() 

693 self.slice_comments: dict[tuple[int, int], str] = {} 

694 self._observability_args: dict[str, Any] = {} 

695 self._observability_predicates: defaultdict[str, PredicateCounts] = defaultdict( 

696 PredicateCounts 

697 ) 

698 

699 self._sampled_from_all_strategies_elements_message: ( 

700 tuple[str, object] | None 

701 ) = None 

702 self._shared_strategy_draws: dict[Hashable, tuple[Any, SearchStrategy]] = {} 

703 self._shared_data_strategy: DataObject | None = None 

704 self._stateful_repr_parts: list[Any] | None = None 

705 self.states_for_ids: dict[int, RandomState] | None = None 

706 self.seeds_to_states: dict[Any, RandomState] | None = None 

707 self.hypothesis_runner: Any = not_set 

708 

709 self.expected_exception: BaseException | None = None 

710 self.expected_traceback: str | None = None 

711 

712 self.prefix = prefix 

713 self.nodes: tuple[ChoiceNode, ...] = () 

714 self.misaligned_at: MisalignedAt | None = None 

715 self.cannot_proceed_scope: CannotProceedScopeT | None = None 

716 self.start_span(TOP_LABEL) 

717 

718 def __repr__(self) -> str: 

719 return "ConjectureData(%s, %d choices%s)" % ( 

720 self.status.name, 

721 len(self.nodes), 

722 ", frozen" if self.frozen else "", 

723 ) 

724 

725 @property 

726 def choices(self) -> tuple[ChoiceT, ...]: 

727 return tuple(node.value for node in self.nodes) 

728 

729 # draw_* functions might be called in one of two contexts: either "above" or 

730 # "below" the choice sequence. For instance, draw_string calls draw_boolean 

731 # from ``many`` when calculating the number of characters to return. We do 

732 # not want these choices to get written to the choice sequence, because they 

733 # are not true choices themselves. 

734 # 

735 # `observe` formalizes this. The choice will only be written to the choice 

736 # sequence if observe is True. 

737 

738 @overload 

739 def _draw( 

740 self, 

741 choice_type: Literal["integer"], 

742 constraints: IntegerConstraints, 

743 *, 

744 observe: bool, 

745 forced: int | None, 

746 ) -> int: ... 

747 

748 @overload 

749 def _draw( 

750 self, 

751 choice_type: Literal["float"], 

752 constraints: FloatConstraints, 

753 *, 

754 observe: bool, 

755 forced: float | None, 

756 ) -> float: ... 

757 

758 @overload 

759 def _draw( 

760 self, 

761 choice_type: Literal["string"], 

762 constraints: StringConstraints, 

763 *, 

764 observe: bool, 

765 forced: str | None, 

766 ) -> str: ... 

767 

768 @overload 

769 def _draw( 

770 self, 

771 choice_type: Literal["bytes"], 

772 constraints: BytesConstraints, 

773 *, 

774 observe: bool, 

775 forced: bytes | None, 

776 ) -> bytes: ... 

777 

778 @overload 

779 def _draw( 

780 self, 

781 choice_type: Literal["boolean"], 

782 constraints: BooleanConstraints, 

783 *, 

784 observe: bool, 

785 forced: bool | None, 

786 ) -> bool: ... 

787 

788 def _draw( 

789 self, 

790 choice_type: ChoiceTypeT, 

791 constraints: ChoiceConstraintsT, 

792 *, 

793 observe: bool, 

794 forced: ChoiceT | None, 

795 ) -> ChoiceT: 

796 # this is somewhat redundant with the length > max_length check at the 

797 # end of the function, but avoids trying to use a null self.random when 

798 # drawing past the node of a ConjectureData.for_choices data. 

799 if self.length == self.max_length: 

800 debug_report(f"overrun because hit {self.max_length=}") 

801 self.mark_overrun() 

802 if len(self.nodes) == self.max_choices: 

803 debug_report(f"overrun because hit {self.max_choices=}") 

804 self.mark_overrun() 

805 

806 if observe and self.prefix is not None and self.index < len(self.prefix): 

807 value = self._pop_choice(choice_type, constraints, forced=forced) 

808 elif forced is None: 

809 value = getattr(self.provider, f"draw_{choice_type}")(**constraints) 

810 

811 if forced is not None: 

812 value = forced 

813 

814 # nan values generated via int_to_float break list membership: 

815 # 

816 # >>> n = 18444492273895866368 

817 # >>> assert math.isnan(int_to_float(n)) 

818 # >>> assert int_to_float(n) not in [int_to_float(n)] 

819 # 

820 # because int_to_float nans are not equal in the sense of either 

821 # `a == b` or `a is b`. 

822 # 

823 # This can lead to flaky errors when collections require unique 

824 # floats. What was happening is that in some places we provided math.nan 

825 # provide math.nan, and in others we provided 

826 # int_to_float(float_to_int(math.nan)), and which one gets used 

827 # was not deterministic across test iterations. 

828 # 

829 # To fix this, *never* provide a nan value which is equal (via `is`) to 

830 # another provided nan value. This sacrifices some test power; we should 

831 # bring that back (ABOVE the choice sequence layer) in the future. 

832 # 

833 # See https://github.com/HypothesisWorks/hypothesis/issues/3926. 

834 if choice_type == "float": 

835 assert isinstance(value, float) 

836 if math.isnan(value): 

837 value = int_to_float(float_to_int(value)) 

838 

839 if observe: 

840 was_forced = forced is not None 

841 getattr(self.observer, f"draw_{choice_type}")( 

842 value, constraints=constraints, was_forced=was_forced 

843 ) 

844 size = 0 if self.provider.avoid_realization else choices_size([value]) 

845 if self.length + size > self.max_length: 

846 debug_report( 

847 f"overrun because {self.length=} + {size=} > {self.max_length=}" 

848 ) 

849 self.mark_overrun() 

850 

851 node = ChoiceNode( 

852 type=choice_type, 

853 value=value, 

854 constraints=constraints, 

855 was_forced=was_forced, 

856 index=len(self.nodes), 

857 ) 

858 self.__span_record.record_choice() 

859 self.nodes += (node,) 

860 self.length += size 

861 

862 return value 

863 

864 def draw_integer( 

865 self, 

866 min_value: int | None = None, 

867 max_value: int | None = None, 

868 *, 

869 weights: dict[int, float] | None = None, 

870 shrink_towards: int = 0, 

871 forced: int | None = None, 

872 observe: bool = True, 

873 ) -> int: 

874 # Validate arguments 

875 if weights is not None: 

876 assert min_value is not None 

877 assert max_value is not None 

878 assert len(weights) <= 255 # arbitrary practical limit 

879 # We can and should eventually support total weights. But this 

880 # complicates shrinking as we can no longer assume we can force 

881 # a value to the unmapped probability mass if that mass might be 0. 

882 assert sum(weights.values()) < 1 

883 # similarly, things get simpler if we assume every value is possible. 

884 # we'll want to drop this restriction eventually. 

885 assert all(w != 0 for w in weights.values()) 

886 

887 if forced is not None and min_value is not None: 

888 assert min_value <= forced 

889 if forced is not None and max_value is not None: 

890 assert forced <= max_value 

891 

892 constraints: IntegerConstraints = self._pooled_constraints( 

893 "integer", 

894 { 

895 "min_value": min_value, 

896 "max_value": max_value, 

897 "weights": weights, 

898 "shrink_towards": shrink_towards, 

899 }, 

900 ) 

901 return self._draw("integer", constraints, observe=observe, forced=forced) 

902 

903 def draw_float( 

904 self, 

905 min_value: float = -math.inf, 

906 max_value: float = math.inf, 

907 *, 

908 allow_nan: bool = True, 

909 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL, 

910 # TODO: consider supporting these float widths at the choice sequence 

911 # level in the future. 

912 # width: Literal[16, 32, 64] = 64, 

913 forced: float | None = None, 

914 observe: bool = True, 

915 ) -> float: 

916 assert smallest_nonzero_magnitude > 0 

917 assert not math.isnan(min_value) 

918 assert not math.isnan(max_value) 

919 

920 if smallest_nonzero_magnitude == 0.0: # pragma: no cover 

921 raise FloatingPointError( 

922 "Got allow_subnormal=True, but we can't represent subnormal floats " 

923 "right now, in violation of the IEEE-754 floating-point " 

924 "specification. This is usually because something was compiled with " 

925 "-ffast-math or a similar option, which sets global processor state. " 

926 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed " 

927 "writeup - and good luck!" 

928 ) 

929 

930 if forced is not None: 

931 assert allow_nan or not math.isnan(forced) 

932 assert math.isnan(forced) or ( 

933 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value) 

934 ) 

935 

936 constraints: FloatConstraints = self._pooled_constraints( 

937 "float", 

938 { 

939 "min_value": min_value, 

940 "max_value": max_value, 

941 "allow_nan": allow_nan, 

942 "smallest_nonzero_magnitude": smallest_nonzero_magnitude, 

943 }, 

944 ) 

945 return self._draw("float", constraints, observe=observe, forced=forced) 

946 

947 def draw_string( 

948 self, 

949 intervals: IntervalSet, 

950 *, 

951 min_size: int = 0, 

952 max_size: int = COLLECTION_DEFAULT_MAX_SIZE, 

953 forced: str | None = None, 

954 observe: bool = True, 

955 ) -> str: 

956 assert forced is None or min_size <= len(forced) <= max_size 

957 assert min_size >= 0 

958 if len(intervals) == 0: 

959 assert min_size == 0 

960 

961 constraints: StringConstraints = self._pooled_constraints( 

962 "string", 

963 { 

964 "intervals": intervals, 

965 "min_size": min_size, 

966 "max_size": max_size, 

967 }, 

968 ) 

969 return self._draw("string", constraints, observe=observe, forced=forced) 

970 

971 def draw_bytes( 

972 self, 

973 min_size: int = 0, 

974 max_size: int = COLLECTION_DEFAULT_MAX_SIZE, 

975 *, 

976 forced: bytes | None = None, 

977 observe: bool = True, 

978 ) -> bytes: 

979 assert forced is None or min_size <= len(forced) <= max_size 

980 assert min_size >= 0 

981 

982 constraints: BytesConstraints = self._pooled_constraints( 

983 "bytes", {"min_size": min_size, "max_size": max_size} 

984 ) 

985 return self._draw("bytes", constraints, observe=observe, forced=forced) 

986 

987 def draw_boolean( 

988 self, 

989 p: float = 0.5, 

990 *, 

991 forced: bool | None = None, 

992 observe: bool = True, 

993 ) -> bool: 

994 assert (forced is not True) or p > 0 

995 assert (forced is not False) or p < 1 

996 

997 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p}) 

998 return self._draw("boolean", constraints, observe=observe, forced=forced) 

999 

1000 @overload 

1001 def _pooled_constraints( 

1002 self, choice_type: Literal["integer"], constraints: IntegerConstraints 

1003 ) -> IntegerConstraints: ... 

1004 

1005 @overload 

1006 def _pooled_constraints( 

1007 self, choice_type: Literal["float"], constraints: FloatConstraints 

1008 ) -> FloatConstraints: ... 

1009 

1010 @overload 

1011 def _pooled_constraints( 

1012 self, choice_type: Literal["string"], constraints: StringConstraints 

1013 ) -> StringConstraints: ... 

1014 

1015 @overload 

1016 def _pooled_constraints( 

1017 self, choice_type: Literal["bytes"], constraints: BytesConstraints 

1018 ) -> BytesConstraints: ... 

1019 

1020 @overload 

1021 def _pooled_constraints( 

1022 self, choice_type: Literal["boolean"], constraints: BooleanConstraints 

1023 ) -> BooleanConstraints: ... 

1024 

1025 def _pooled_constraints( 

1026 self, choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT 

1027 ) -> ChoiceConstraintsT: 

1028 """Memoize common dictionary objects to reduce memory pressure.""" 

1029 # caching runs afoul of nondeterminism checks 

1030 if self.provider.avoid_realization: 

1031 return constraints 

1032 

1033 key = (choice_type, *choice_constraints_key(choice_type, constraints)) 

1034 try: 

1035 return POOLED_CONSTRAINTS_CACHE[key] 

1036 except KeyError: 

1037 POOLED_CONSTRAINTS_CACHE[key] = constraints 

1038 return constraints 

1039 

1040 def _pop_choice( 

1041 self, 

1042 choice_type: ChoiceTypeT, 

1043 constraints: ChoiceConstraintsT, 

1044 *, 

1045 forced: ChoiceT | None, 

1046 ) -> ChoiceT: 

1047 assert self.prefix is not None 

1048 # checked in _draw 

1049 assert self.index < len(self.prefix) 

1050 

1051 value = self.prefix[self.index] 

1052 if isinstance(value, ChoiceTemplate): 

1053 node: ChoiceTemplate = value 

1054 if node.count is not None: 

1055 assert node.count >= 0 

1056 # node templates have to be at the end for now, since it's not immediately 

1057 # apparent how to handle overruning a node template while generating a single 

1058 # node if the alternative is not "the entire data is an overrun". 

1059 assert self.index == len(self.prefix) - 1 

1060 if node.type == "simplest": 

1061 if forced is not None: 

1062 choice = forced 

1063 try: 

1064 choice = choice_from_index(0, choice_type, constraints) 

1065 except ChoiceTooLarge: 

1066 self.mark_overrun() 

1067 else: 

1068 raise NotImplementedError 

1069 

1070 if node.count is not None: 

1071 node.count -= 1 

1072 if node.count < 0: 

1073 self.mark_overrun() 

1074 return choice 

1075 

1076 choice = value 

1077 node_choice_type = { 

1078 str: "string", 

1079 float: "float", 

1080 int: "integer", 

1081 bool: "boolean", 

1082 bytes: "bytes", 

1083 }[type(choice)] 

1084 # If we're trying to: 

1085 # * draw a different choice type at the same location 

1086 # * draw the same choice type with a different constraints, which does not permit 

1087 # the current value 

1088 # 

1089 # then we call this a misalignment, because the choice sequence has 

1090 # changed from what we expected at some point. An easy misalignment is 

1091 # 

1092 # one_of(integers(0, 100), integers(101, 200)) 

1093 # 

1094 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100} 

1095 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at 

1096 # index 1 (which does not permit any of the values 0-100). 

1097 # 

1098 # When the choice sequence becomes misaligned, we generate a new value of the 

1099 # type and constraints the strategy expects. 

1100 if node_choice_type != choice_type or not choice_permitted(choice, constraints): 

1101 # only track first misalignment for now. 

1102 if self.misaligned_at is None: 

1103 self.misaligned_at = (self.index, choice_type, constraints, forced) 

1104 try: 

1105 # Fill in any misalignments with index 0 choices. An alternative to 

1106 # this is using the index of the misaligned choice instead 

1107 # of index 0, which may be useful for maintaining 

1108 # "similarly-complex choices" in the shrinker. This requires 

1109 # attaching an index to every choice in ConjectureData.for_choices, 

1110 # which we don't always have (e.g. when reading from db). 

1111 # 

1112 # If we really wanted this in the future we could make this complexity 

1113 # optional, use it if present, and default to index 0 otherwise. 

1114 # This complicates our internal api and so I'd like to avoid it 

1115 # if possible. 

1116 # 

1117 # Additionally, I don't think slips which require 

1118 # slipping to high-complexity values are common. Though arguably 

1119 # we may want to expand a bit beyond *just* the simplest choice. 

1120 # (we could for example consider sampling choices from index 0-10). 

1121 choice = choice_from_index(0, choice_type, constraints) 

1122 except ChoiceTooLarge: 

1123 # should really never happen with a 0-index choice, but let's be safe. 

1124 self.mark_overrun() 

1125 

1126 self.index += 1 

1127 return choice 

1128 

1129 def as_result(self) -> ConjectureResult | _Overrun: 

1130 """Convert the result of running this test into 

1131 either an Overrun object or a ConjectureResult.""" 

1132 

1133 assert self.frozen 

1134 if self.status == Status.OVERRUN: 

1135 return Overrun 

1136 if self.__result is None: 

1137 self.__result = ConjectureResult( 

1138 status=self.status, 

1139 interesting_origin=self.interesting_origin, 

1140 spans=self.spans, 

1141 nodes=self.nodes, 

1142 length=self.length, 

1143 output=self.output, 

1144 expected_traceback=self.expected_traceback, 

1145 expected_exception=self.expected_exception, 

1146 has_discards=self.has_discards, 

1147 target_observations=self.target_observations, 

1148 tags=frozenset(self.tags), 

1149 arg_slices=self.arg_slices, 

1150 slice_comments=self.slice_comments, 

1151 misaligned_at=self.misaligned_at, 

1152 cannot_proceed_scope=self.cannot_proceed_scope, 

1153 ) 

1154 assert self.__result is not None 

1155 return self.__result 

1156 

1157 def __assert_not_frozen(self, name: str) -> None: 

1158 if self.frozen: 

1159 raise Frozen(f"Cannot call {name} on frozen ConjectureData") 

1160 

1161 def note(self, value: Any) -> None: 

1162 self.__assert_not_frozen("note") 

1163 if not isinstance(value, str): 

1164 value = repr(value) 

1165 self.output += value 

1166 

1167 def draw( 

1168 self, 

1169 strategy: "SearchStrategy[Ex]", 

1170 label: int | None = None, 

1171 observe_as: str | None = None, 

1172 ) -> "Ex": 

1173 from hypothesis.internal.observability import observability_enabled 

1174 from hypothesis.strategies._internal.lazy import unwrap_strategies 

1175 from hypothesis.strategies._internal.utils import to_jsonable 

1176 

1177 at_top_level = self.depth == 0 

1178 start_time = None 

1179 if at_top_level: 

1180 # We start this timer early, because accessing attributes on a LazyStrategy 

1181 # can be almost arbitrarily slow. In cases like characters() and text() 

1182 # where we cache something expensive, this led to Flaky deadline errors! 

1183 # See https://github.com/HypothesisWorks/hypothesis/issues/2108 

1184 start_time = time.perf_counter() 

1185 gc_start_time = gc_cumulative_time() 

1186 

1187 strategy.validate() 

1188 

1189 if strategy.is_empty: 

1190 self.mark_invalid(f"empty strategy {self!r}") 

1191 

1192 if self.depth >= MAX_DEPTH: 

1193 self.mark_invalid("max depth exceeded") 

1194 

1195 # Jump directly to the unwrapped strategy for the label and for do_draw. 

1196 # This avoids adding an extra span to all lazy strategies. 

1197 unwrapped = unwrap_strategies(strategy) 

1198 if label is None: 

1199 label = unwrapped.label 

1200 assert isinstance(label, int) 

1201 

1202 self.start_span(label=label) 

1203 try: 

1204 if not at_top_level: 

1205 return unwrapped.do_draw(self) 

1206 assert start_time is not None 

1207 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}" 

1208 try: 

1209 try: 

1210 v = unwrapped.do_draw(self) 

1211 finally: 

1212 # Subtract the time spent in GC to avoid overcounting, as it is 

1213 # accounted for at the overall example level. 

1214 in_gctime = gc_cumulative_time() - gc_start_time 

1215 self.draw_times[key] = time.perf_counter() - start_time - in_gctime 

1216 except Exception as err: 

1217 add_note( 

1218 err, 

1219 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}", 

1220 ) 

1221 raise 

1222 if observability_enabled(): 

1223 avoid = self.provider.avoid_realization 

1224 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid) 

1225 return v 

1226 finally: 

1227 self.stop_span() 

1228 

1229 def start_span(self, label: int) -> None: 

1230 self.provider.span_start(label) 

1231 self.__assert_not_frozen("start_span") 

1232 self.depth += 1 

1233 # Logically it would make sense for this to just be 

1234 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to 

1235 # be until we ran the code under tracemalloc and found a rather significant 

1236 # chunk of allocation was happening here. This was presumably due to varargs 

1237 # or the like, but we didn't investigate further given that it was easy 

1238 # to fix with this check. 

1239 if self.depth > self.max_depth: 

1240 self.max_depth = self.depth 

1241 self.__span_record.start_span(label) 

1242 self.labels_for_structure_stack.append({label}) 

1243 

1244 def stop_span(self, *, discard: bool = False) -> None: 

1245 self.provider.span_end(discard) 

1246 if self.frozen: 

1247 return 

1248 if discard: 

1249 self.has_discards = True 

1250 self.depth -= 1 

1251 assert self.depth >= -1 

1252 self.__span_record.stop_span(discard=discard) 

1253 

1254 labels_for_structure = self.labels_for_structure_stack.pop() 

1255 

1256 if not discard: 

1257 if self.labels_for_structure_stack: 

1258 self.labels_for_structure_stack[-1].update(labels_for_structure) 

1259 else: 

1260 self.tags.update([structural_coverage(l) for l in labels_for_structure]) 

1261 

1262 if discard: 

1263 # Once we've discarded a span, every test case starting with 

1264 # this prefix contains discards. We prune the tree at that point so 

1265 # as to avoid future test cases bothering with this region, on the 

1266 # assumption that some span that you could have used instead 

1267 # there would *not* trigger the discard. This greatly speeds up 

1268 # test case generation in some cases, because it allows us to 

1269 # ignore large swathes of the search space that are effectively 

1270 # redundant. 

1271 # 

1272 # A scenario that can cause us problems but which we deliberately 

1273 # have decided not to support is that if there are side effects 

1274 # during data generation then you may end up with a scenario where 

1275 # every good test case generates a discard because the discarded 

1276 # section sets up important things for later. This is not terribly 

1277 # likely and all that you see in this case is some degradation in 

1278 # quality of testing, so we don't worry about it. 

1279 # 

1280 # Note that killing the branch does *not* mean we will never 

1281 # explore below this point, and in particular we may do so during 

1282 # shrinking. Any explicit request for a data object that starts 

1283 # with the branch here will work just fine, but novel prefix 

1284 # generation will avoid it, and we can use it to detect when we 

1285 # have explored the entire tree (up to redundancy). 

1286 

1287 self.observer.kill_branch() 

1288 

1289 @property 

1290 def spans(self) -> Spans: 

1291 assert self.frozen 

1292 if self.__spans is None: 

1293 self.__spans = Spans(record=self.__span_record) 

1294 return self.__spans 

1295 

1296 def freeze(self) -> None: 

1297 if self.frozen: 

1298 return 

1299 self.finish_time = time.perf_counter() 

1300 self.gc_finish_time = gc_cumulative_time() 

1301 

1302 # Always finish by closing all remaining spans so that we have a valid tree. 

1303 while self.depth >= 0: 

1304 self.stop_span() 

1305 

1306 self.__span_record.freeze() 

1307 self.frozen = True 

1308 self.observer.conclude_test(self.status, self.interesting_origin) 

1309 

1310 def choice( 

1311 self, 

1312 values: Sequence[T], 

1313 *, 

1314 forced: T | None = None, 

1315 observe: bool = True, 

1316 ) -> T: 

1317 forced_i = None if forced is None else values.index(forced) 

1318 i = self.draw_integer( 

1319 0, 

1320 len(values) - 1, 

1321 forced=forced_i, 

1322 observe=observe, 

1323 ) 

1324 return values[i] 

1325 

1326 def conclude_test( 

1327 self, 

1328 status: Status, 

1329 interesting_origin: InterestingOrigin | None = None, 

1330 ) -> NoReturn: 

1331 assert (interesting_origin is None) or (status == Status.INTERESTING) 

1332 self.__assert_not_frozen("conclude_test") 

1333 self.interesting_origin = interesting_origin 

1334 self.status = status 

1335 self.freeze() 

1336 raise StopTest(self.testcounter) 

1337 

1338 def mark_interesting(self, interesting_origin: InterestingOrigin) -> NoReturn: 

1339 self.conclude_test(Status.INTERESTING, interesting_origin) 

1340 

1341 def mark_invalid(self, why: str | None = None) -> NoReturn: 

1342 if why is not None: 

1343 self.events["invalid because"] = why 

1344 self.conclude_test(Status.INVALID) 

1345 

1346 def mark_overrun(self) -> NoReturn: 

1347 self.conclude_test(Status.OVERRUN) 

1348 

1349 

1350def draw_choice( 

1351 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT, *, random: Random 

1352) -> ChoiceT: 

1353 cd = ConjectureData(random=random) 

1354 return cast(ChoiceT, getattr(cd.provider, f"draw_{choice_type}")(**constraints))