Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/hypothesis/internal/conjecture/data.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

596 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import math 

12import time 

13from collections import defaultdict 

14from collections.abc import Hashable, Iterable, Iterator, Sequence 

15from enum import IntEnum 

16from functools import cached_property 

17from random import Random 

18from typing import TYPE_CHECKING, Any, NoReturn, Optional, TypeVar, Union 

19 

20import attr 

21 

22from hypothesis.errors import ( 

23 CannotProceedScopeT, 

24 ChoiceTooLarge, 

25 Frozen, 

26 InvalidArgument, 

27 StopTest, 

28) 

29from hypothesis.internal.cache import LRUCache 

30from hypothesis.internal.compat import add_note 

31from hypothesis.internal.conjecture.choice import ( 

32 BooleanConstraints, 

33 BytesConstraints, 

34 ChoiceConstraintsT, 

35 ChoiceNode, 

36 ChoiceT, 

37 ChoiceTemplate, 

38 ChoiceTypeT, 

39 FloatConstraints, 

40 IntegerConstraints, 

41 StringConstraints, 

42 choice_constraints_key, 

43 choice_from_index, 

44 choice_permitted, 

45 choices_size, 

46) 

47from hypothesis.internal.conjecture.junkdrawer import IntList, gc_cumulative_time 

48from hypothesis.internal.conjecture.providers import ( 

49 COLLECTION_DEFAULT_MAX_SIZE, 

50 HypothesisProvider, 

51 PrimitiveProvider, 

52) 

53from hypothesis.internal.conjecture.utils import calc_label_from_name 

54from hypothesis.internal.escalation import InterestingOrigin 

55from hypothesis.internal.floats import ( 

56 SMALLEST_SUBNORMAL, 

57 float_to_int, 

58 int_to_float, 

59 sign_aware_lte, 

60) 

61from hypothesis.internal.intervalsets import IntervalSet 

62from hypothesis.reporting import debug_report 

63 

64if TYPE_CHECKING: 

65 from typing import TypeAlias 

66 

67 from hypothesis.strategies import SearchStrategy 

68 from hypothesis.strategies._internal.strategies import Ex 

69 

70 

71def __getattr__(name: str) -> Any: 

72 if name == "AVAILABLE_PROVIDERS": 

73 from hypothesis._settings import note_deprecation 

74 from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS 

75 

76 note_deprecation( 

77 "hypothesis.internal.conjecture.data.AVAILABLE_PROVIDERS has been moved to " 

78 "hypothesis.internal.conjecture.providers.AVAILABLE_PROVIDERS.", 

79 since="2025-01-25", 

80 has_codemod=False, 

81 stacklevel=1, 

82 ) 

83 return AVAILABLE_PROVIDERS 

84 

85 raise AttributeError( 

86 f"Module 'hypothesis.internal.conjecture.data' has no attribute {name}" 

87 ) 

88 

89 

90T = TypeVar("T") 

91TargetObservations = dict[str, Union[int, float]] 

92# index, choice_type, constraints, forced value 

93MisalignedAt: "TypeAlias" = tuple[ 

94 int, ChoiceTypeT, ChoiceConstraintsT, Optional[ChoiceT] 

95] 

96 

97TOP_LABEL = calc_label_from_name("top") 

98 

99 

100class ExtraInformation: 

101 """A class for holding shared state on a ``ConjectureData`` that should 

102 be added to the final ``ConjectureResult``.""" 

103 

104 def __repr__(self) -> str: 

105 return "ExtraInformation({})".format( 

106 ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()), 

107 ) 

108 

109 def has_information(self) -> bool: 

110 return bool(self.__dict__) 

111 

112 

113class Status(IntEnum): 

114 OVERRUN = 0 

115 INVALID = 1 

116 VALID = 2 

117 INTERESTING = 3 

118 

119 def __repr__(self) -> str: 

120 return f"Status.{self.name}" 

121 

122 

123@attr.s(slots=True, frozen=True) 

124class StructuralCoverageTag: 

125 label: int = attr.ib() 

126 

127 

128STRUCTURAL_COVERAGE_CACHE: dict[int, StructuralCoverageTag] = {} 

129 

130 

131def structural_coverage(label: int) -> StructuralCoverageTag: 

132 try: 

133 return STRUCTURAL_COVERAGE_CACHE[label] 

134 except KeyError: 

135 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label)) 

136 

137 

138# This cache can be quite hot and so we prefer LRUCache over LRUReusedCache for 

139# performance. We lose scan resistance, but that's probably fine here. 

140POOLED_CONSTRAINTS_CACHE: LRUCache[tuple[Any, ...], ChoiceConstraintsT] = LRUCache(4096) 

141 

142 

143class Span: 

144 """A span tracks the hierarchical structure of choices within a single test run. 

145 

146 Spans are created to mark regions of the choice sequence that that are 

147 logically related to each other. For instance, Hypothesis tracks: 

148 - A single top-level span for the entire choice sequence 

149 - A span for the choices made by each strategy 

150 - Some strategies define additional spans within their choices. For instance, 

151 st.lists() tracks the "should add another element" choice and the "add 

152 another element" choices as separate spans. 

153 

154 Spans provide useful information to the shrinker, mutator, targeted PBT, 

155 and other subsystems of Hypothesis. 

156 

157 Rather than store each ``Span`` as a rich object, it is actually 

158 just an index into the ``Spans`` class defined below. This has two 

159 purposes: Firstly, for most properties of spans we will never need 

160 to allocate storage at all, because most properties are not used on 

161 most spans. Secondly, by storing the spans as compact lists 

162 of integers, we save a considerable amount of space compared to 

163 Python's normal object size. 

164 

165 This does have the downside that it increases the amount of allocation 

166 we do, and slows things down as a result, in some usage patterns because 

167 we repeatedly allocate the same Span or int objects, but it will 

168 often dramatically reduce our memory usage, so is worth it. 

169 """ 

170 

171 __slots__ = ("index", "owner") 

172 

173 def __init__(self, owner: "Spans", index: int) -> None: 

174 self.owner = owner 

175 self.index = index 

176 

177 def __eq__(self, other: object) -> bool: 

178 if self is other: 

179 return True 

180 if not isinstance(other, Span): 

181 return NotImplemented 

182 return (self.owner is other.owner) and (self.index == other.index) 

183 

184 def __ne__(self, other: object) -> bool: 

185 if self is other: 

186 return False 

187 if not isinstance(other, Span): 

188 return NotImplemented 

189 return (self.owner is not other.owner) or (self.index != other.index) 

190 

191 def __repr__(self) -> str: 

192 return f"spans[{self.index}]" 

193 

194 @property 

195 def label(self) -> int: 

196 """A label is an opaque value that associates each span with its 

197 approximate origin, such as a particular strategy class or a particular 

198 kind of draw.""" 

199 return self.owner.labels[self.owner.label_indices[self.index]] 

200 

201 @property 

202 def parent(self) -> Optional[int]: 

203 """The index of the span that this one is nested directly within.""" 

204 if self.index == 0: 

205 return None 

206 return self.owner.parentage[self.index] 

207 

208 @property 

209 def start(self) -> int: 

210 return self.owner.starts[self.index] 

211 

212 @property 

213 def end(self) -> int: 

214 return self.owner.ends[self.index] 

215 

216 @property 

217 def depth(self) -> int: 

218 """ 

219 Depth of this span in the span tree. The top-level span has a depth of 0. 

220 """ 

221 return self.owner.depths[self.index] 

222 

223 @property 

224 def discarded(self) -> bool: 

225 """True if this is span's ``stop_span`` call had ``discard`` set to 

226 ``True``. This means we believe that the shrinker should be able to delete 

227 this span completely, without affecting the value produced by its enclosing 

228 strategy. Typically set when a rejection sampler decides to reject a 

229 generated value and try again.""" 

230 return self.index in self.owner.discarded 

231 

232 @property 

233 def choice_count(self) -> int: 

234 """The number of choices in this span.""" 

235 return self.end - self.start 

236 

237 @property 

238 def children(self) -> "list[Span]": 

239 """The list of all spans with this as a parent, in increasing index 

240 order.""" 

241 return [self.owner[i] for i in self.owner.children[self.index]] 

242 

243 

244class SpanProperty: 

245 """There are many properties of spans that we calculate by 

246 essentially rerunning the test case multiple times based on the 

247 calls which we record in SpanProperty. 

248 

249 This class defines a visitor, subclasses of which can be used 

250 to calculate these properties. 

251 """ 

252 

253 def __init__(self, spans: "Spans"): 

254 self.span_stack: list[int] = [] 

255 self.spans = spans 

256 self.span_count = 0 

257 self.choice_count = 0 

258 

259 def run(self) -> Any: 

260 """Rerun the test case with this visitor and return the 

261 results of ``self.finish()``.""" 

262 for record in self.spans.trail: 

263 if record == TrailType.CHOICE: 

264 self.choice_count += 1 

265 elif record >= TrailType.START_SPAN: 

266 self.__push(record - TrailType.START_SPAN) 

267 else: 

268 assert record in ( 

269 TrailType.STOP_SPAN_DISCARD, 

270 TrailType.STOP_SPAN_NO_DISCARD, 

271 ) 

272 self.__pop(discarded=record == TrailType.STOP_SPAN_DISCARD) 

273 return self.finish() 

274 

275 def __push(self, label_index: int) -> None: 

276 i = self.span_count 

277 assert i < len(self.spans) 

278 self.start_span(i, label_index=label_index) 

279 self.span_count += 1 

280 self.span_stack.append(i) 

281 

282 def __pop(self, *, discarded: bool) -> None: 

283 i = self.span_stack.pop() 

284 self.stop_span(i, discarded=discarded) 

285 

286 def start_span(self, i: int, label_index: int) -> None: 

287 """Called at the start of each span, with ``i`` the 

288 index of the span and ``label_index`` the index of 

289 its label in ``self.spans.labels``.""" 

290 

291 def stop_span(self, i: int, *, discarded: bool) -> None: 

292 """Called at the end of each span, with ``i`` the 

293 index of the span and ``discarded`` being ``True`` if ``stop_span`` 

294 was called with ``discard=True``.""" 

295 

296 def finish(self) -> Any: 

297 raise NotImplementedError 

298 

299 

300class TrailType(IntEnum): 

301 STOP_SPAN_DISCARD = 1 

302 STOP_SPAN_NO_DISCARD = 2 

303 START_SPAN = 3 

304 CHOICE = calc_label_from_name("ir draw record") 

305 

306 

307class SpanRecord: 

308 """Records the series of ``start_span``, ``stop_span``, and 

309 ``draw_bits`` calls so that these may be stored in ``Spans`` and 

310 replayed when we need to know about the structure of individual 

311 ``Span`` objects. 

312 

313 Note that there is significant similarity between this class and 

314 ``DataObserver``, and the plan is to eventually unify them, but 

315 they currently have slightly different functions and implementations. 

316 """ 

317 

318 def __init__(self) -> None: 

319 self.labels: list[int] = [] 

320 self.__index_of_labels: Optional[dict[int, int]] = {} 

321 self.trail = IntList() 

322 self.nodes: list[ChoiceNode] = [] 

323 

324 def freeze(self) -> None: 

325 self.__index_of_labels = None 

326 

327 def record_choice(self) -> None: 

328 self.trail.append(TrailType.CHOICE) 

329 

330 def start_span(self, label: int) -> None: 

331 assert self.__index_of_labels is not None 

332 try: 

333 i = self.__index_of_labels[label] 

334 except KeyError: 

335 i = self.__index_of_labels.setdefault(label, len(self.labels)) 

336 self.labels.append(label) 

337 self.trail.append(TrailType.START_SPAN + i) 

338 

339 def stop_span(self, *, discard: bool) -> None: 

340 if discard: 

341 self.trail.append(TrailType.STOP_SPAN_DISCARD) 

342 else: 

343 self.trail.append(TrailType.STOP_SPAN_NO_DISCARD) 

344 

345 

346class _starts_and_ends(SpanProperty): 

347 def __init__(self, spans: "Spans") -> None: 

348 super().__init__(spans) 

349 self.starts = IntList.of_length(len(self.spans)) 

350 self.ends = IntList.of_length(len(self.spans)) 

351 

352 def start_span(self, i: int, label_index: int) -> None: 

353 self.starts[i] = self.choice_count 

354 

355 def stop_span(self, i: int, *, discarded: bool) -> None: 

356 self.ends[i] = self.choice_count 

357 

358 def finish(self) -> tuple[IntList, IntList]: 

359 return (self.starts, self.ends) 

360 

361 

362class _discarded(SpanProperty): 

363 def __init__(self, spans: "Spans") -> None: 

364 super().__init__(spans) 

365 self.result: set[int] = set() 

366 

367 def finish(self) -> frozenset[int]: 

368 return frozenset(self.result) 

369 

370 def stop_span(self, i: int, *, discarded: bool) -> None: 

371 if discarded: 

372 self.result.add(i) 

373 

374 

375class _parentage(SpanProperty): 

376 def __init__(self, spans: "Spans") -> None: 

377 super().__init__(spans) 

378 self.result = IntList.of_length(len(self.spans)) 

379 

380 def stop_span(self, i: int, *, discarded: bool) -> None: 

381 if i > 0: 

382 self.result[i] = self.span_stack[-1] 

383 

384 def finish(self) -> IntList: 

385 return self.result 

386 

387 

388class _depths(SpanProperty): 

389 def __init__(self, spans: "Spans") -> None: 

390 super().__init__(spans) 

391 self.result = IntList.of_length(len(self.spans)) 

392 

393 def start_span(self, i: int, label_index: int) -> None: 

394 self.result[i] = len(self.span_stack) 

395 

396 def finish(self) -> IntList: 

397 return self.result 

398 

399 

400class _label_indices(SpanProperty): 

401 def __init__(self, spans: "Spans") -> None: 

402 super().__init__(spans) 

403 self.result = IntList.of_length(len(self.spans)) 

404 

405 def start_span(self, i: int, label_index: int) -> None: 

406 self.result[i] = label_index 

407 

408 def finish(self) -> IntList: 

409 return self.result 

410 

411 

412class _mutator_groups(SpanProperty): 

413 def __init__(self, spans: "Spans") -> None: 

414 super().__init__(spans) 

415 self.groups: dict[int, set[tuple[int, int]]] = defaultdict(set) 

416 

417 def start_span(self, i: int, label_index: int) -> None: 

418 # TODO should we discard start == end cases? occurs for eg st.data() 

419 # which is conditionally or never drawn from. arguably swapping 

420 # nodes with the empty list is a useful mutation enabled by start == end? 

421 key = (self.spans[i].start, self.spans[i].end) 

422 self.groups[label_index].add(key) 

423 

424 def finish(self) -> Iterable[set[tuple[int, int]]]: 

425 # Discard groups with only one span, since the mutator can't 

426 # do anything useful with them. 

427 return [g for g in self.groups.values() if len(g) >= 2] 

428 

429 

430class Spans: 

431 """A lazy collection of ``Span`` objects, derived from 

432 the record of recorded behaviour in ``SpanRecord``. 

433 

434 Behaves logically as if it were a list of ``Span`` objects, 

435 but actually mostly exists as a compact store of information 

436 for them to reference into. All properties on here are best 

437 understood as the backing storage for ``Span`` and are 

438 described there. 

439 """ 

440 

441 def __init__(self, record: SpanRecord) -> None: 

442 self.trail = record.trail 

443 self.labels = record.labels 

444 self.__length = self.trail.count( 

445 TrailType.STOP_SPAN_DISCARD 

446 ) + record.trail.count(TrailType.STOP_SPAN_NO_DISCARD) 

447 self.__children: Optional[list[Sequence[int]]] = None 

448 

449 @cached_property 

450 def starts_and_ends(self) -> tuple[IntList, IntList]: 

451 return _starts_and_ends(self).run() 

452 

453 @property 

454 def starts(self) -> IntList: 

455 return self.starts_and_ends[0] 

456 

457 @property 

458 def ends(self) -> IntList: 

459 return self.starts_and_ends[1] 

460 

461 @cached_property 

462 def discarded(self) -> frozenset[int]: 

463 return _discarded(self).run() 

464 

465 @cached_property 

466 def parentage(self) -> IntList: 

467 return _parentage(self).run() 

468 

469 @cached_property 

470 def depths(self) -> IntList: 

471 return _depths(self).run() 

472 

473 @cached_property 

474 def label_indices(self) -> IntList: 

475 return _label_indices(self).run() 

476 

477 @cached_property 

478 def mutator_groups(self) -> list[set[tuple[int, int]]]: 

479 return _mutator_groups(self).run() 

480 

481 @property 

482 def children(self) -> list[Sequence[int]]: 

483 if self.__children is None: 

484 children = [IntList() for _ in range(len(self))] 

485 for i, p in enumerate(self.parentage): 

486 if i > 0: 

487 children[p].append(i) 

488 # Replace empty children lists with a tuple to reduce 

489 # memory usage. 

490 for i, c in enumerate(children): 

491 if not c: 

492 children[i] = () # type: ignore 

493 self.__children = children # type: ignore 

494 return self.__children # type: ignore 

495 

496 def __len__(self) -> int: 

497 return self.__length 

498 

499 def __getitem__(self, i: int) -> Span: 

500 n = self.__length 

501 if i < -n or i >= n: 

502 raise IndexError(f"Index {i} out of range [-{n}, {n})") 

503 if i < 0: 

504 i += n 

505 return Span(self, i) 

506 

507 # not strictly necessary as we have len/getitem, but required for mypy. 

508 # https://github.com/python/mypy/issues/9737 

509 def __iter__(self) -> Iterator[Span]: 

510 for i in range(len(self)): 

511 yield self[i] 

512 

513 

514class _Overrun: 

515 status: Status = Status.OVERRUN 

516 

517 def __repr__(self) -> str: 

518 return "Overrun" 

519 

520 

521Overrun = _Overrun() 

522 

523global_test_counter = 0 

524 

525 

526MAX_DEPTH = 100 

527 

528 

529class DataObserver: 

530 """Observer class for recording the behaviour of a 

531 ConjectureData object, primarily used for tracking 

532 the behaviour in the tree cache.""" 

533 

534 def conclude_test( 

535 self, 

536 status: Status, 

537 interesting_origin: Optional[InterestingOrigin], 

538 ) -> None: 

539 """Called when ``conclude_test`` is called on the 

540 observed ``ConjectureData``, with the same arguments. 

541 

542 Note that this is called after ``freeze`` has completed. 

543 """ 

544 

545 def kill_branch(self) -> None: 

546 """Mark this part of the tree as not worth re-exploring.""" 

547 

548 def draw_integer( 

549 self, value: int, *, constraints: IntegerConstraints, was_forced: bool 

550 ) -> None: 

551 pass 

552 

553 def draw_float( 

554 self, value: float, *, constraints: FloatConstraints, was_forced: bool 

555 ) -> None: 

556 pass 

557 

558 def draw_string( 

559 self, value: str, *, constraints: StringConstraints, was_forced: bool 

560 ) -> None: 

561 pass 

562 

563 def draw_bytes( 

564 self, value: bytes, *, constraints: BytesConstraints, was_forced: bool 

565 ) -> None: 

566 pass 

567 

568 def draw_boolean( 

569 self, value: bool, *, constraints: BooleanConstraints, was_forced: bool 

570 ) -> None: 

571 pass 

572 

573 

574@attr.s(slots=True) 

575class ConjectureResult: 

576 """Result class storing the parts of ConjectureData that we 

577 will care about after the original ConjectureData has outlived its 

578 usefulness.""" 

579 

580 status: Status = attr.ib() 

581 interesting_origin: Optional[InterestingOrigin] = attr.ib() 

582 nodes: tuple[ChoiceNode, ...] = attr.ib(eq=False, repr=False) 

583 length: int = attr.ib() 

584 output: str = attr.ib() 

585 extra_information: Optional[ExtraInformation] = attr.ib() 

586 expected_exception: Optional[BaseException] = attr.ib() 

587 expected_traceback: Optional[str] = attr.ib() 

588 has_discards: bool = attr.ib() 

589 target_observations: TargetObservations = attr.ib() 

590 tags: frozenset[StructuralCoverageTag] = attr.ib() 

591 spans: Spans = attr.ib(repr=False, eq=False) 

592 arg_slices: set[tuple[int, int]] = attr.ib(repr=False) 

593 slice_comments: dict[tuple[int, int], str] = attr.ib(repr=False) 

594 misaligned_at: Optional[MisalignedAt] = attr.ib(repr=False) 

595 cannot_proceed_scope: Optional[CannotProceedScopeT] = attr.ib(repr=False) 

596 

597 def as_result(self) -> "ConjectureResult": 

598 return self 

599 

600 @property 

601 def choices(self) -> tuple[ChoiceT, ...]: 

602 return tuple(node.value for node in self.nodes) 

603 

604 

605class ConjectureData: 

606 @classmethod 

607 def for_choices( 

608 cls, 

609 choices: Sequence[Union[ChoiceTemplate, ChoiceT]], 

610 *, 

611 observer: Optional[DataObserver] = None, 

612 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

613 random: Optional[Random] = None, 

614 ) -> "ConjectureData": 

615 from hypothesis.internal.conjecture.engine import choice_count 

616 

617 return cls( 

618 max_choices=choice_count(choices), 

619 random=random, 

620 prefix=choices, 

621 observer=observer, 

622 provider=provider, 

623 ) 

624 

625 def __init__( 

626 self, 

627 *, 

628 random: Optional[Random], 

629 observer: Optional[DataObserver] = None, 

630 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

631 prefix: Optional[Sequence[Union[ChoiceTemplate, ChoiceT]]] = None, 

632 max_choices: Optional[int] = None, 

633 provider_kw: Optional[dict[str, Any]] = None, 

634 ) -> None: 

635 from hypothesis.internal.conjecture.engine import BUFFER_SIZE 

636 

637 if observer is None: 

638 observer = DataObserver() 

639 if provider_kw is None: 

640 provider_kw = {} 

641 elif not isinstance(provider, type): 

642 raise InvalidArgument( 

643 f"Expected {provider=} to be a class since {provider_kw=} was " 

644 "passed, but got an instance instead." 

645 ) 

646 

647 assert isinstance(observer, DataObserver) 

648 self.observer = observer 

649 self.max_choices = max_choices 

650 self.max_length = BUFFER_SIZE 

651 self.is_find = False 

652 self.overdraw = 0 

653 self._random = random 

654 

655 self.length = 0 

656 self.index = 0 

657 self.output = "" 

658 self.status = Status.VALID 

659 self.frozen = False 

660 global global_test_counter 

661 self.testcounter = global_test_counter 

662 global_test_counter += 1 

663 self.start_time = time.perf_counter() 

664 self.gc_start_time = gc_cumulative_time() 

665 self.events: dict[str, Union[str, int, float]] = {} 

666 self.interesting_origin: Optional[InterestingOrigin] = None 

667 self.draw_times: dict[str, float] = {} 

668 self._stateful_run_times: dict[str, float] = defaultdict(float) 

669 self.max_depth = 0 

670 self.has_discards = False 

671 

672 self.provider: PrimitiveProvider = ( 

673 provider(self, **provider_kw) if isinstance(provider, type) else provider 

674 ) 

675 assert isinstance(self.provider, PrimitiveProvider) 

676 

677 self.__result: Optional[ConjectureResult] = None 

678 

679 # Observations used for targeted search. They'll be aggregated in 

680 # ConjectureRunner.generate_new_examples and fed to TargetSelector. 

681 self.target_observations: TargetObservations = {} 

682 

683 # Tags which indicate something about which part of the search space 

684 # this example is in. These are used to guide generation. 

685 self.tags: set[StructuralCoverageTag] = set() 

686 self.labels_for_structure_stack: list[set[int]] = [] 

687 

688 # Normally unpopulated but we need this in the niche case 

689 # that self.as_result() is Overrun but we still want the 

690 # examples for reporting purposes. 

691 self.__spans: Optional[Spans] = None 

692 

693 # We want the top level span to have depth 0, so we start 

694 # at -1. 

695 self.depth = -1 

696 self.__span_record = SpanRecord() 

697 

698 # Slice indices for discrete reportable parts that which-parts-matter can 

699 # try varying, to report if the minimal example always fails anyway. 

700 self.arg_slices: set[tuple[int, int]] = set() 

701 self.slice_comments: dict[tuple[int, int], str] = {} 

702 self._observability_args: dict[str, Any] = {} 

703 self._observability_predicates: defaultdict = defaultdict( 

704 lambda: {"satisfied": 0, "unsatisfied": 0} 

705 ) 

706 self._sampled_from_all_strategies_elements_message: Optional[ 

707 tuple[str, object] 

708 ] = None 

709 self._shared_strategy_draws: dict[Hashable, Any] = {} 

710 

711 self.expected_exception: Optional[BaseException] = None 

712 self.expected_traceback: Optional[str] = None 

713 self.extra_information = ExtraInformation() 

714 

715 self.prefix = prefix 

716 self.nodes: tuple[ChoiceNode, ...] = () 

717 self.misaligned_at: Optional[MisalignedAt] = None 

718 self.cannot_proceed_scope: Optional[CannotProceedScopeT] = None 

719 self.start_span(TOP_LABEL) 

720 

721 def __repr__(self) -> str: 

722 return "ConjectureData(%s, %d choices%s)" % ( 

723 self.status.name, 

724 len(self.nodes), 

725 ", frozen" if self.frozen else "", 

726 ) 

727 

728 @property 

729 def choices(self) -> tuple[ChoiceT, ...]: 

730 return tuple(node.value for node in self.nodes) 

731 

732 # draw_* functions might be called in one of two contexts: either "above" or 

733 # "below" the choice sequence. For instance, draw_string calls draw_boolean 

734 # from ``many`` when calculating the number of characters to return. We do 

735 # not want these choices to get written to the choice sequence, because they 

736 # are not true choices themselves. 

737 # 

738 # `observe` formalizes this. The choice will only be written to the choice 

739 # sequence if observe is True. 

740 def _draw(self, choice_type, constraints, *, observe, forced): 

741 # this is somewhat redundant with the length > max_length check at the 

742 # end of the function, but avoids trying to use a null self.random when 

743 # drawing past the node of a ConjectureData.for_choices data. 

744 if self.length == self.max_length: 

745 debug_report(f"overrun because hit {self.max_length=}") 

746 self.mark_overrun() 

747 if len(self.nodes) == self.max_choices: 

748 debug_report(f"overrun because hit {self.max_choices=}") 

749 self.mark_overrun() 

750 

751 if observe and self.prefix is not None and self.index < len(self.prefix): 

752 value = self._pop_choice(choice_type, constraints, forced=forced) 

753 elif forced is None: 

754 value = getattr(self.provider, f"draw_{choice_type}")(**constraints) 

755 

756 if forced is not None: 

757 value = forced 

758 

759 # nan values generated via int_to_float break list membership: 

760 # 

761 # >>> n = 18444492273895866368 

762 # >>> assert math.isnan(int_to_float(n)) 

763 # >>> assert int_to_float(n) not in [int_to_float(n)] 

764 # 

765 # because int_to_float nans are not equal in the sense of either 

766 # `a == b` or `a is b`. 

767 # 

768 # This can lead to flaky errors when collections require unique 

769 # floats. What was happening is that in some places we provided math.nan 

770 # provide math.nan, and in others we provided 

771 # int_to_float(float_to_int(math.nan)), and which one gets used 

772 # was not deterministic across test iterations. 

773 # 

774 # To fix this, *never* provide a nan value which is equal (via `is`) to 

775 # another provided nan value. This sacrifices some test power; we should 

776 # bring that back (ABOVE the choice sequence layer) in the future. 

777 # 

778 # See https://github.com/HypothesisWorks/hypothesis/issues/3926. 

779 if choice_type == "float" and math.isnan(value): 

780 value = int_to_float(float_to_int(value)) 

781 

782 if observe: 

783 was_forced = forced is not None 

784 getattr(self.observer, f"draw_{choice_type}")( 

785 value, constraints=constraints, was_forced=was_forced 

786 ) 

787 size = 0 if self.provider.avoid_realization else choices_size([value]) 

788 if self.length + size > self.max_length: 

789 debug_report( 

790 f"overrun because {self.length=} + {size=} > {self.max_length=}" 

791 ) 

792 self.mark_overrun() 

793 

794 node = ChoiceNode( 

795 type=choice_type, 

796 value=value, 

797 constraints=constraints, 

798 was_forced=was_forced, 

799 index=len(self.nodes), 

800 ) 

801 self.__span_record.record_choice() 

802 self.nodes += (node,) 

803 self.length += size 

804 

805 return value 

806 

807 def draw_integer( 

808 self, 

809 min_value: Optional[int] = None, 

810 max_value: Optional[int] = None, 

811 *, 

812 weights: Optional[dict[int, float]] = None, 

813 shrink_towards: int = 0, 

814 forced: Optional[int] = None, 

815 observe: bool = True, 

816 ) -> int: 

817 # Validate arguments 

818 if weights is not None: 

819 assert min_value is not None 

820 assert max_value is not None 

821 assert len(weights) <= 255 # arbitrary practical limit 

822 # We can and should eventually support total weights. But this 

823 # complicates shrinking as we can no longer assume we can force 

824 # a value to the unmapped probability mass if that mass might be 0. 

825 assert sum(weights.values()) < 1 

826 # similarly, things get simpler if we assume every value is possible. 

827 # we'll want to drop this restriction eventually. 

828 assert all(w != 0 for w in weights.values()) 

829 

830 if forced is not None and min_value is not None: 

831 assert min_value <= forced 

832 if forced is not None and max_value is not None: 

833 assert forced <= max_value 

834 

835 constraints: IntegerConstraints = self._pooled_constraints( 

836 "integer", 

837 { 

838 "min_value": min_value, 

839 "max_value": max_value, 

840 "weights": weights, 

841 "shrink_towards": shrink_towards, 

842 }, 

843 ) 

844 return self._draw("integer", constraints, observe=observe, forced=forced) 

845 

846 def draw_float( 

847 self, 

848 min_value: float = -math.inf, 

849 max_value: float = math.inf, 

850 *, 

851 allow_nan: bool = True, 

852 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL, 

853 # TODO: consider supporting these float widths at the choice sequence 

854 # level in the future. 

855 # width: Literal[16, 32, 64] = 64, 

856 forced: Optional[float] = None, 

857 observe: bool = True, 

858 ) -> float: 

859 assert smallest_nonzero_magnitude > 0 

860 assert not math.isnan(min_value) 

861 assert not math.isnan(max_value) 

862 

863 if smallest_nonzero_magnitude == 0.0: # pragma: no cover 

864 raise FloatingPointError( 

865 "Got allow_subnormal=True, but we can't represent subnormal floats " 

866 "right now, in violation of the IEEE-754 floating-point " 

867 "specification. This is usually because something was compiled with " 

868 "-ffast-math or a similar option, which sets global processor state. " 

869 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed " 

870 "writeup - and good luck!" 

871 ) 

872 

873 if forced is not None: 

874 assert allow_nan or not math.isnan(forced) 

875 assert math.isnan(forced) or ( 

876 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value) 

877 ) 

878 

879 constraints: FloatConstraints = self._pooled_constraints( 

880 "float", 

881 { 

882 "min_value": min_value, 

883 "max_value": max_value, 

884 "allow_nan": allow_nan, 

885 "smallest_nonzero_magnitude": smallest_nonzero_magnitude, 

886 }, 

887 ) 

888 return self._draw("float", constraints, observe=observe, forced=forced) 

889 

890 def draw_string( 

891 self, 

892 intervals: IntervalSet, 

893 *, 

894 min_size: int = 0, 

895 max_size: int = COLLECTION_DEFAULT_MAX_SIZE, 

896 forced: Optional[str] = None, 

897 observe: bool = True, 

898 ) -> str: 

899 assert forced is None or min_size <= len(forced) <= max_size 

900 assert min_size >= 0 

901 if len(intervals) == 0: 

902 assert min_size == 0 

903 

904 constraints: StringConstraints = self._pooled_constraints( 

905 "string", 

906 { 

907 "intervals": intervals, 

908 "min_size": min_size, 

909 "max_size": max_size, 

910 }, 

911 ) 

912 return self._draw("string", constraints, observe=observe, forced=forced) 

913 

914 def draw_bytes( 

915 self, 

916 min_size: int = 0, 

917 max_size: int = COLLECTION_DEFAULT_MAX_SIZE, 

918 *, 

919 forced: Optional[bytes] = None, 

920 observe: bool = True, 

921 ) -> bytes: 

922 assert forced is None or min_size <= len(forced) <= max_size 

923 assert min_size >= 0 

924 

925 constraints: BytesConstraints = self._pooled_constraints( 

926 "bytes", {"min_size": min_size, "max_size": max_size} 

927 ) 

928 return self._draw("bytes", constraints, observe=observe, forced=forced) 

929 

930 def draw_boolean( 

931 self, 

932 p: float = 0.5, 

933 *, 

934 forced: Optional[bool] = None, 

935 observe: bool = True, 

936 ) -> bool: 

937 assert (forced is not True) or p > 0 

938 assert (forced is not False) or p < 1 

939 

940 constraints: BooleanConstraints = self._pooled_constraints("boolean", {"p": p}) 

941 return self._draw("boolean", constraints, observe=observe, forced=forced) 

942 

943 def _pooled_constraints(self, choice_type, constraints): 

944 """Memoize common dictionary objects to reduce memory pressure.""" 

945 # caching runs afoul of nondeterminism checks 

946 if self.provider.avoid_realization: 

947 return constraints 

948 

949 key = (choice_type, *choice_constraints_key(choice_type, constraints)) 

950 try: 

951 return POOLED_CONSTRAINTS_CACHE[key] 

952 except KeyError: 

953 POOLED_CONSTRAINTS_CACHE[key] = constraints 

954 return constraints 

955 

956 def _pop_choice( 

957 self, 

958 choice_type: ChoiceTypeT, 

959 constraints: ChoiceConstraintsT, 

960 *, 

961 forced: Optional[ChoiceT], 

962 ) -> ChoiceT: 

963 assert self.prefix is not None 

964 # checked in _draw 

965 assert self.index < len(self.prefix) 

966 

967 value = self.prefix[self.index] 

968 if isinstance(value, ChoiceTemplate): 

969 node: ChoiceTemplate = value 

970 if node.count is not None: 

971 assert node.count >= 0 

972 # node templates have to be at the end for now, since it's not immediately 

973 # apparent how to handle overruning a node template while generating a single 

974 # node if the alternative is not "the entire data is an overrun". 

975 assert self.index == len(self.prefix) - 1 

976 if node.type == "simplest": 

977 if forced is not None: 

978 choice = forced 

979 try: 

980 choice = choice_from_index(0, choice_type, constraints) 

981 except ChoiceTooLarge: 

982 self.mark_overrun() 

983 else: 

984 raise NotImplementedError 

985 

986 if node.count is not None: 

987 node.count -= 1 

988 if node.count < 0: 

989 self.mark_overrun() 

990 return choice 

991 

992 choice = value 

993 node_choice_type = { 

994 str: "string", 

995 float: "float", 

996 int: "integer", 

997 bool: "boolean", 

998 bytes: "bytes", 

999 }[type(choice)] 

1000 # If we're trying to: 

1001 # * draw a different choice type at the same location 

1002 # * draw the same choice type with a different constraints, which does not permit 

1003 # the current value 

1004 # 

1005 # then we call this a misalignment, because the choice sequence has 

1006 # changed from what we expected at some point. An easy misalignment is 

1007 # 

1008 # one_of(integers(0, 100), integers(101, 200)) 

1009 # 

1010 # where the choice sequence [0, 100] has constraints {min_value: 0, max_value: 100} 

1011 # at index 1, but [0, 101] has constraints {min_value: 101, max_value: 200} at 

1012 # index 1 (which does not permit any of the values 0-100). 

1013 # 

1014 # When the choice sequence becomes misaligned, we generate a new value of the 

1015 # type and constraints the strategy expects. 

1016 if node_choice_type != choice_type or not choice_permitted(choice, constraints): 

1017 # only track first misalignment for now. 

1018 if self.misaligned_at is None: 

1019 self.misaligned_at = (self.index, choice_type, constraints, forced) 

1020 try: 

1021 # Fill in any misalignments with index 0 choices. An alternative to 

1022 # this is using the index of the misaligned choice instead 

1023 # of index 0, which may be useful for maintaining 

1024 # "similarly-complex choices" in the shrinker. This requires 

1025 # attaching an index to every choice in ConjectureData.for_choices, 

1026 # which we don't always have (e.g. when reading from db). 

1027 # 

1028 # If we really wanted this in the future we could make this complexity 

1029 # optional, use it if present, and default to index 0 otherwise. 

1030 # This complicates our internal api and so I'd like to avoid it 

1031 # if possible. 

1032 # 

1033 # Additionally, I don't think slips which require 

1034 # slipping to high-complexity values are common. Though arguably 

1035 # we may want to expand a bit beyond *just* the simplest choice. 

1036 # (we could for example consider sampling choices from index 0-10). 

1037 choice = choice_from_index(0, choice_type, constraints) 

1038 except ChoiceTooLarge: 

1039 # should really never happen with a 0-index choice, but let's be safe. 

1040 self.mark_overrun() 

1041 

1042 self.index += 1 

1043 return choice 

1044 

1045 def as_result(self) -> Union[ConjectureResult, _Overrun]: 

1046 """Convert the result of running this test into 

1047 either an Overrun object or a ConjectureResult.""" 

1048 

1049 assert self.frozen 

1050 if self.status == Status.OVERRUN: 

1051 return Overrun 

1052 if self.__result is None: 

1053 self.__result = ConjectureResult( 

1054 status=self.status, 

1055 interesting_origin=self.interesting_origin, 

1056 spans=self.spans, 

1057 nodes=self.nodes, 

1058 length=self.length, 

1059 output=self.output, 

1060 expected_traceback=self.expected_traceback, 

1061 expected_exception=self.expected_exception, 

1062 extra_information=( 

1063 self.extra_information 

1064 if self.extra_information.has_information() 

1065 else None 

1066 ), 

1067 has_discards=self.has_discards, 

1068 target_observations=self.target_observations, 

1069 tags=frozenset(self.tags), 

1070 arg_slices=self.arg_slices, 

1071 slice_comments=self.slice_comments, 

1072 misaligned_at=self.misaligned_at, 

1073 cannot_proceed_scope=self.cannot_proceed_scope, 

1074 ) 

1075 assert self.__result is not None 

1076 return self.__result 

1077 

1078 def __assert_not_frozen(self, name: str) -> None: 

1079 if self.frozen: 

1080 raise Frozen(f"Cannot call {name} on frozen ConjectureData") 

1081 

1082 def note(self, value: Any) -> None: 

1083 self.__assert_not_frozen("note") 

1084 if not isinstance(value, str): 

1085 value = repr(value) 

1086 self.output += value 

1087 

1088 def draw( 

1089 self, 

1090 strategy: "SearchStrategy[Ex]", 

1091 label: Optional[int] = None, 

1092 observe_as: Optional[str] = None, 

1093 ) -> "Ex": 

1094 from hypothesis.internal.observability import TESTCASE_CALLBACKS 

1095 from hypothesis.strategies._internal.lazy import unwrap_strategies 

1096 from hypothesis.strategies._internal.utils import to_jsonable 

1097 

1098 if self.is_find and not strategy.supports_find: 

1099 raise InvalidArgument( 

1100 f"Cannot use strategy {strategy!r} within a call to find " 

1101 "(presumably because it would be invalid after the call had ended)." 

1102 ) 

1103 

1104 at_top_level = self.depth == 0 

1105 start_time = None 

1106 if at_top_level: 

1107 # We start this timer early, because accessing attributes on a LazyStrategy 

1108 # can be almost arbitrarily slow. In cases like characters() and text() 

1109 # where we cache something expensive, this led to Flaky deadline errors! 

1110 # See https://github.com/HypothesisWorks/hypothesis/issues/2108 

1111 start_time = time.perf_counter() 

1112 gc_start_time = gc_cumulative_time() 

1113 

1114 strategy.validate() 

1115 

1116 if strategy.is_empty: 

1117 self.mark_invalid(f"empty strategy {self!r}") 

1118 

1119 if self.depth >= MAX_DEPTH: 

1120 self.mark_invalid("max depth exceeded") 

1121 

1122 # Jump directly to the unwrapped strategy for the label and for do_draw. 

1123 # This avoids adding an extra span to all lazy strategies. 

1124 unwrapped = unwrap_strategies(strategy) 

1125 if label is None: 

1126 label = unwrapped.label 

1127 assert isinstance(label, int) 

1128 

1129 self.start_span(label=label) 

1130 try: 

1131 if not at_top_level: 

1132 return unwrapped.do_draw(self) 

1133 assert start_time is not None 

1134 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}" 

1135 try: 

1136 try: 

1137 v = unwrapped.do_draw(self) 

1138 finally: 

1139 # Subtract the time spent in GC to avoid overcounting, as it is 

1140 # accounted for at the overall example level. 

1141 in_gctime = gc_cumulative_time() - gc_start_time 

1142 self.draw_times[key] = time.perf_counter() - start_time - in_gctime 

1143 except Exception as err: 

1144 add_note( 

1145 err, 

1146 f"while generating {key.removeprefix('generate:')!r} from {strategy!r}", 

1147 ) 

1148 raise 

1149 if TESTCASE_CALLBACKS: 

1150 avoid = self.provider.avoid_realization 

1151 self._observability_args[key] = to_jsonable(v, avoid_realization=avoid) 

1152 return v 

1153 finally: 

1154 self.stop_span() 

1155 

1156 def start_span(self, label: int) -> None: 

1157 self.provider.span_start(label) 

1158 self.__assert_not_frozen("start_span") 

1159 self.depth += 1 

1160 # Logically it would make sense for this to just be 

1161 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to 

1162 # be until we ran the code under tracemalloc and found a rather significant 

1163 # chunk of allocation was happening here. This was presumably due to varargs 

1164 # or the like, but we didn't investigate further given that it was easy 

1165 # to fix with this check. 

1166 if self.depth > self.max_depth: 

1167 self.max_depth = self.depth 

1168 self.__span_record.start_span(label) 

1169 self.labels_for_structure_stack.append({label}) 

1170 

1171 def stop_span(self, *, discard: bool = False) -> None: 

1172 self.provider.span_end(discard) 

1173 if self.frozen: 

1174 return 

1175 if discard: 

1176 self.has_discards = True 

1177 self.depth -= 1 

1178 assert self.depth >= -1 

1179 self.__span_record.stop_span(discard=discard) 

1180 

1181 labels_for_structure = self.labels_for_structure_stack.pop() 

1182 

1183 if not discard: 

1184 if self.labels_for_structure_stack: 

1185 self.labels_for_structure_stack[-1].update(labels_for_structure) 

1186 else: 

1187 self.tags.update([structural_coverage(l) for l in labels_for_structure]) 

1188 

1189 if discard: 

1190 # Once we've discarded a span, every test case starting with 

1191 # this prefix contains discards. We prune the tree at that point so 

1192 # as to avoid future test cases bothering with this region, on the 

1193 # assumption that some span that you could have used instead 

1194 # there would *not* trigger the discard. This greatly speeds up 

1195 # test case generation in some cases, because it allows us to 

1196 # ignore large swathes of the search space that are effectively 

1197 # redundant. 

1198 # 

1199 # A scenario that can cause us problems but which we deliberately 

1200 # have decided not to support is that if there are side effects 

1201 # during data generation then you may end up with a scenario where 

1202 # every good test case generates a discard because the discarded 

1203 # section sets up important things for later. This is not terribly 

1204 # likely and all that you see in this case is some degradation in 

1205 # quality of testing, so we don't worry about it. 

1206 # 

1207 # Note that killing the branch does *not* mean we will never 

1208 # explore below this point, and in particular we may do so during 

1209 # shrinking. Any explicit request for a data object that starts 

1210 # with the branch here will work just fine, but novel prefix 

1211 # generation will avoid it, and we can use it to detect when we 

1212 # have explored the entire tree (up to redundancy). 

1213 

1214 self.observer.kill_branch() 

1215 

1216 @property 

1217 def spans(self) -> Spans: 

1218 assert self.frozen 

1219 if self.__spans is None: 

1220 self.__spans = Spans(record=self.__span_record) 

1221 return self.__spans 

1222 

1223 def freeze(self) -> None: 

1224 if self.frozen: 

1225 return 

1226 self.finish_time = time.perf_counter() 

1227 self.gc_finish_time = gc_cumulative_time() 

1228 

1229 # Always finish by closing all remaining spans so that we have a valid tree. 

1230 while self.depth >= 0: 

1231 self.stop_span() 

1232 

1233 self.__span_record.freeze() 

1234 self.frozen = True 

1235 self.observer.conclude_test(self.status, self.interesting_origin) 

1236 

1237 def choice( 

1238 self, 

1239 values: Sequence[T], 

1240 *, 

1241 forced: Optional[T] = None, 

1242 observe: bool = True, 

1243 ) -> T: 

1244 forced_i = None if forced is None else values.index(forced) 

1245 i = self.draw_integer( 

1246 0, 

1247 len(values) - 1, 

1248 forced=forced_i, 

1249 observe=observe, 

1250 ) 

1251 return values[i] 

1252 

1253 def conclude_test( 

1254 self, 

1255 status: Status, 

1256 interesting_origin: Optional[InterestingOrigin] = None, 

1257 ) -> NoReturn: 

1258 assert (interesting_origin is None) or (status == Status.INTERESTING) 

1259 self.__assert_not_frozen("conclude_test") 

1260 self.interesting_origin = interesting_origin 

1261 self.status = status 

1262 self.freeze() 

1263 raise StopTest(self.testcounter) 

1264 

1265 def mark_interesting( 

1266 self, interesting_origin: Optional[InterestingOrigin] = None 

1267 ) -> NoReturn: 

1268 self.conclude_test(Status.INTERESTING, interesting_origin) 

1269 

1270 def mark_invalid(self, why: Optional[str] = None) -> NoReturn: 

1271 if why is not None: 

1272 self.events["invalid because"] = why 

1273 self.conclude_test(Status.INVALID) 

1274 

1275 def mark_overrun(self) -> NoReturn: 

1276 self.conclude_test(Status.OVERRUN) 

1277 

1278 

1279def draw_choice(choice_type, constraints, *, random): 

1280 cd = ConjectureData(random=random) 

1281 return getattr(cd.provider, f"draw_{choice_type}")(**constraints)