Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/hypothesis/internal/conjecture/data.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1166 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import abc 

12import contextlib 

13import math 

14import time 

15from collections import defaultdict 

16from enum import IntEnum 

17from random import Random 

18from sys import float_info 

19from typing import ( 

20 TYPE_CHECKING, 

21 Any, 

22 Callable, 

23 DefaultDict, 

24 Dict, 

25 FrozenSet, 

26 Iterable, 

27 Iterator, 

28 List, 

29 Literal, 

30 NoReturn, 

31 Optional, 

32 Sequence, 

33 Set, 

34 Tuple, 

35 Type, 

36 TypedDict, 

37 TypeVar, 

38 Union, 

39) 

40 

41import attr 

42 

43from hypothesis.errors import Frozen, InvalidArgument, StopTest 

44from hypothesis.internal.cache import LRUReusedCache 

45from hypothesis.internal.compat import add_note, floor, int_from_bytes, int_to_bytes 

46from hypothesis.internal.conjecture.floats import float_to_lex, lex_to_float 

47from hypothesis.internal.conjecture.junkdrawer import ( 

48 IntList, 

49 gc_cumulative_time, 

50 uniform, 

51) 

52from hypothesis.internal.conjecture.utils import ( 

53 INT_SIZES, 

54 INT_SIZES_SAMPLER, 

55 Sampler, 

56 calc_label_from_name, 

57 many, 

58) 

59from hypothesis.internal.floats import ( 

60 SIGNALING_NAN, 

61 SMALLEST_SUBNORMAL, 

62 float_to_int, 

63 int_to_float, 

64 make_float_clamper, 

65 next_down, 

66 next_up, 

67 sign_aware_lte, 

68) 

69from hypothesis.internal.intervalsets import IntervalSet 

70 

71if TYPE_CHECKING: 

72 from typing import TypeAlias 

73 

74 from typing_extensions import dataclass_transform 

75 

76 from hypothesis.strategies import SearchStrategy 

77 from hypothesis.strategies._internal.strategies import Ex 

78else: 

79 TypeAlias = object 

80 

81 def dataclass_transform(): 

82 def wrapper(tp): 

83 return tp 

84 

85 return wrapper 

86 

87 

88TOP_LABEL = calc_label_from_name("top") 

89InterestingOrigin = Tuple[ 

90 Type[BaseException], str, int, Tuple[Any, ...], Tuple[Tuple[Any, ...], ...] 

91] 

92TargetObservations = Dict[str, Union[int, float]] 

93 

94T = TypeVar("T") 

95 

96 

97class IntegerKWargs(TypedDict): 

98 min_value: Optional[int] 

99 max_value: Optional[int] 

100 weights: Optional[Sequence[float]] 

101 shrink_towards: int 

102 

103 

104class FloatKWargs(TypedDict): 

105 min_value: float 

106 max_value: float 

107 allow_nan: bool 

108 smallest_nonzero_magnitude: float 

109 

110 

111class StringKWargs(TypedDict): 

112 intervals: IntervalSet 

113 min_size: int 

114 max_size: Optional[int] 

115 

116 

117class BytesKWargs(TypedDict): 

118 size: int 

119 

120 

121class BooleanKWargs(TypedDict): 

122 p: float 

123 

124 

125IRType: TypeAlias = Union[int, str, bool, float, bytes] 

126IRKWargsType: TypeAlias = Union[ 

127 IntegerKWargs, FloatKWargs, StringKWargs, BytesKWargs, BooleanKWargs 

128] 

129IRTypeName: TypeAlias = Literal["integer", "string", "boolean", "float", "bytes"] 

130# ir_type, kwargs, forced 

131InvalidAt: TypeAlias = Tuple[IRTypeName, IRKWargsType, Optional[IRType]] 

132 

133 

134class ExtraInformation: 

135 """A class for holding shared state on a ``ConjectureData`` that should 

136 be added to the final ``ConjectureResult``.""" 

137 

138 def __repr__(self) -> str: 

139 return "ExtraInformation({})".format( 

140 ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()), 

141 ) 

142 

143 def has_information(self) -> bool: 

144 return bool(self.__dict__) 

145 

146 

147class Status(IntEnum): 

148 OVERRUN = 0 

149 INVALID = 1 

150 VALID = 2 

151 INTERESTING = 3 

152 

153 def __repr__(self) -> str: 

154 return f"Status.{self.name}" 

155 

156 

157@dataclass_transform() 

158@attr.s(frozen=True, slots=True, auto_attribs=True) 

159class StructuralCoverageTag: 

160 label: int 

161 

162 

163STRUCTURAL_COVERAGE_CACHE: Dict[int, StructuralCoverageTag] = {} 

164 

165 

166def structural_coverage(label: int) -> StructuralCoverageTag: 

167 try: 

168 return STRUCTURAL_COVERAGE_CACHE[label] 

169 except KeyError: 

170 return STRUCTURAL_COVERAGE_CACHE.setdefault(label, StructuralCoverageTag(label)) 

171 

172 

173NASTY_FLOATS = sorted( 

174 [ 

175 0.0, 

176 0.5, 

177 1.1, 

178 1.5, 

179 1.9, 

180 1.0 / 3, 

181 10e6, 

182 10e-6, 

183 1.175494351e-38, 

184 next_up(0.0), 

185 float_info.min, 

186 float_info.max, 

187 3.402823466e38, 

188 9007199254740992, 

189 1 - 10e-6, 

190 2 + 10e-6, 

191 1.192092896e-07, 

192 2.2204460492503131e-016, 

193 ] 

194 + [2.0**-n for n in (24, 14, 149, 126)] # minimum (sub)normals for float16,32 

195 + [float_info.min / n for n in (2, 10, 1000, 100_000)] # subnormal in float64 

196 + [math.inf, math.nan] * 5 

197 + [SIGNALING_NAN], 

198 key=float_to_lex, 

199) 

200NASTY_FLOATS = list(map(float, NASTY_FLOATS)) 

201NASTY_FLOATS.extend([-x for x in NASTY_FLOATS]) 

202 

203FLOAT_INIT_LOGIC_CACHE = LRUReusedCache(4096) 

204 

205POOLED_KWARGS_CACHE = LRUReusedCache(4096) 

206 

207DRAW_STRING_DEFAULT_MAX_SIZE = 10**10 # "arbitrarily large" 

208 

209 

210class Example: 

211 """Examples track the hierarchical structure of draws from the byte stream, 

212 within a single test run. 

213 

214 Examples are created to mark regions of the byte stream that might be 

215 useful to the shrinker, such as: 

216 - The bytes used by a single draw from a strategy. 

217 - Useful groupings within a strategy, such as individual list elements. 

218 - Strategy-like helper functions that aren't first-class strategies. 

219 - Each lowest-level draw of bits or bytes from the byte stream. 

220 - A single top-level example that spans the entire input. 

221 

222 Example-tracking allows the shrinker to try "high-level" transformations, 

223 such as rearranging or deleting the elements of a list, without having 

224 to understand their exact representation in the byte stream. 

225 

226 Rather than store each ``Example`` as a rich object, it is actually 

227 just an index into the ``Examples`` class defined below. This has two 

228 purposes: Firstly, for most properties of examples we will never need 

229 to allocate storage at all, because most properties are not used on 

230 most examples. Secondly, by storing the properties as compact lists 

231 of integers, we save a considerable amount of space compared to 

232 Python's normal object size. 

233 

234 This does have the downside that it increases the amount of allocation 

235 we do, and slows things down as a result, in some usage patterns because 

236 we repeatedly allocate the same Example or int objects, but it will 

237 often dramatically reduce our memory usage, so is worth it. 

238 """ 

239 

240 __slots__ = ("owner", "index") 

241 

242 def __init__(self, owner: "Examples", index: int) -> None: 

243 self.owner = owner 

244 self.index = index 

245 

246 def __eq__(self, other: object) -> bool: 

247 if self is other: 

248 return True 

249 if not isinstance(other, Example): 

250 return NotImplemented 

251 return (self.owner is other.owner) and (self.index == other.index) 

252 

253 def __ne__(self, other: object) -> bool: 

254 if self is other: 

255 return False 

256 if not isinstance(other, Example): 

257 return NotImplemented 

258 return (self.owner is not other.owner) or (self.index != other.index) 

259 

260 def __repr__(self) -> str: 

261 return f"examples[{self.index}]" 

262 

263 @property 

264 def label(self) -> int: 

265 """A label is an opaque value that associates each example with its 

266 approximate origin, such as a particular strategy class or a particular 

267 kind of draw.""" 

268 return self.owner.labels[self.owner.label_indices[self.index]] 

269 

270 @property 

271 def parent(self): 

272 """The index of the example that this one is nested directly within.""" 

273 if self.index == 0: 

274 return None 

275 return self.owner.parentage[self.index] 

276 

277 @property 

278 def start(self) -> int: 

279 """The position of the start of this example in the byte stream.""" 

280 return self.owner.starts[self.index] 

281 

282 @property 

283 def end(self) -> int: 

284 """The position directly after the last byte in this byte stream. 

285 i.e. the example corresponds to the half open region [start, end). 

286 """ 

287 return self.owner.ends[self.index] 

288 

289 @property 

290 def ir_start(self) -> int: 

291 return self.owner.ir_starts[self.index] 

292 

293 @property 

294 def ir_end(self) -> int: 

295 return self.owner.ir_ends[self.index] 

296 

297 @property 

298 def depth(self): 

299 """Depth of this example in the example tree. The top-level example has a 

300 depth of 0.""" 

301 return self.owner.depths[self.index] 

302 

303 @property 

304 def trivial(self): 

305 """An example is "trivial" if it only contains forced bytes and zero bytes. 

306 All examples start out as trivial, and then get marked non-trivial when 

307 we see a byte that is neither forced nor zero.""" 

308 return self.index in self.owner.trivial 

309 

310 @property 

311 def discarded(self) -> bool: 

312 """True if this is example's ``stop_example`` call had ``discard`` set to 

313 ``True``. This means we believe that the shrinker should be able to delete 

314 this example completely, without affecting the value produced by its enclosing 

315 strategy. Typically set when a rejection sampler decides to reject a 

316 generated value and try again.""" 

317 return self.index in self.owner.discarded 

318 

319 @property 

320 def length(self) -> int: 

321 """The number of bytes in this example.""" 

322 return self.end - self.start 

323 

324 @property 

325 def ir_length(self) -> int: 

326 """The number of ir nodes in this example.""" 

327 return self.ir_end - self.ir_start 

328 

329 @property 

330 def children(self) -> "List[Example]": 

331 """The list of all examples with this as a parent, in increasing index 

332 order.""" 

333 return [self.owner[i] for i in self.owner.children[self.index]] 

334 

335 

336class ExampleProperty: 

337 """There are many properties of examples that we calculate by 

338 essentially rerunning the test case multiple times based on the 

339 calls which we record in ExampleRecord. 

340 

341 This class defines a visitor, subclasses of which can be used 

342 to calculate these properties. 

343 """ 

344 

345 def __init__(self, examples: "Examples"): 

346 self.example_stack: "List[int]" = [] 

347 self.examples = examples 

348 self.bytes_read = 0 

349 self.example_count = 0 

350 self.block_count = 0 

351 self.ir_node_count = 0 

352 

353 def run(self) -> Any: 

354 """Rerun the test case with this visitor and return the 

355 results of ``self.finish()``.""" 

356 self.begin() 

357 blocks = self.examples.blocks 

358 for record in self.examples.trail: 

359 if record == DRAW_BITS_RECORD: 

360 self.bytes_read = blocks.endpoints[self.block_count] 

361 self.block(self.block_count) 

362 self.block_count += 1 

363 elif record == IR_NODE_RECORD: 

364 data = self.examples.ir_nodes[self.ir_node_count] 

365 self.ir_node(data) 

366 self.ir_node_count += 1 

367 elif record >= START_EXAMPLE_RECORD: 

368 self.__push(record - START_EXAMPLE_RECORD) 

369 else: 

370 assert record in ( 

371 STOP_EXAMPLE_DISCARD_RECORD, 

372 STOP_EXAMPLE_NO_DISCARD_RECORD, 

373 ) 

374 self.__pop(discarded=record == STOP_EXAMPLE_DISCARD_RECORD) 

375 return self.finish() 

376 

377 def __push(self, label_index: int) -> None: 

378 i = self.example_count 

379 assert i < len(self.examples) 

380 self.start_example(i, label_index=label_index) 

381 self.example_count += 1 

382 self.example_stack.append(i) 

383 

384 def __pop(self, *, discarded: bool) -> None: 

385 i = self.example_stack.pop() 

386 self.stop_example(i, discarded=discarded) 

387 

388 def begin(self) -> None: 

389 """Called at the beginning of the run to initialise any 

390 relevant state.""" 

391 self.result = IntList.of_length(len(self.examples)) 

392 

393 def start_example(self, i: int, label_index: int) -> None: 

394 """Called at the start of each example, with ``i`` the 

395 index of the example and ``label_index`` the index of 

396 its label in ``self.examples.labels``.""" 

397 

398 def block(self, i: int) -> None: 

399 """Called with each ``draw_bits`` call, with ``i`` the index of the 

400 corresponding block in ``self.examples.blocks``""" 

401 

402 def stop_example(self, i: int, *, discarded: bool) -> None: 

403 """Called at the end of each example, with ``i`` the 

404 index of the example and ``discarded`` being ``True`` if ``stop_example`` 

405 was called with ``discard=True``.""" 

406 

407 def ir_node(self, node: "IRNode") -> None: 

408 """Called when an ir node is drawn.""" 

409 

410 def finish(self) -> Any: 

411 return self.result 

412 

413 

414def calculated_example_property(cls: Type[ExampleProperty]) -> Any: 

415 """Given an ``ExampleProperty`` as above we use this decorator 

416 to transform it into a lazy property on the ``Examples`` class, 

417 which has as its value the result of calling ``cls.run()``, 

418 computed the first time the property is accessed. 

419 

420 This has the slightly weird result that we are defining nested 

421 classes which get turned into properties.""" 

422 name = cls.__name__ 

423 cache_name = "__" + name 

424 

425 def lazy_calculate(self: "Examples") -> IntList: 

426 result = getattr(self, cache_name, None) 

427 if result is None: 

428 result = cls(self).run() 

429 setattr(self, cache_name, result) 

430 return result 

431 

432 lazy_calculate.__name__ = cls.__name__ 

433 lazy_calculate.__qualname__ = cls.__qualname__ 

434 return property(lazy_calculate) 

435 

436 

437DRAW_BITS_RECORD = 0 

438STOP_EXAMPLE_DISCARD_RECORD = 1 

439STOP_EXAMPLE_NO_DISCARD_RECORD = 2 

440START_EXAMPLE_RECORD = 3 

441 

442IR_NODE_RECORD = calc_label_from_name("ir draw record") 

443 

444 

445class ExampleRecord: 

446 """Records the series of ``start_example``, ``stop_example``, and 

447 ``draw_bits`` calls so that these may be stored in ``Examples`` and 

448 replayed when we need to know about the structure of individual 

449 ``Example`` objects. 

450 

451 Note that there is significant similarity between this class and 

452 ``DataObserver``, and the plan is to eventually unify them, but 

453 they currently have slightly different functions and implementations. 

454 """ 

455 

456 def __init__(self) -> None: 

457 self.labels: List[int] = [] 

458 self.__index_of_labels: "Optional[Dict[int, int]]" = {} 

459 self.trail = IntList() 

460 self.ir_nodes: List[IRNode] = [] 

461 

462 def freeze(self) -> None: 

463 self.__index_of_labels = None 

464 

465 def record_ir_draw(self, ir_type, value, *, kwargs, was_forced): 

466 self.trail.append(IR_NODE_RECORD) 

467 node = IRNode( 

468 ir_type=ir_type, 

469 value=value, 

470 kwargs=kwargs, 

471 was_forced=was_forced, 

472 index=len(self.ir_nodes), 

473 ) 

474 self.ir_nodes.append(node) 

475 

476 def start_example(self, label: int) -> None: 

477 assert self.__index_of_labels is not None 

478 try: 

479 i = self.__index_of_labels[label] 

480 except KeyError: 

481 i = self.__index_of_labels.setdefault(label, len(self.labels)) 

482 self.labels.append(label) 

483 self.trail.append(START_EXAMPLE_RECORD + i) 

484 

485 def stop_example(self, *, discard: bool) -> None: 

486 if discard: 

487 self.trail.append(STOP_EXAMPLE_DISCARD_RECORD) 

488 else: 

489 self.trail.append(STOP_EXAMPLE_NO_DISCARD_RECORD) 

490 

491 def draw_bits(self) -> None: 

492 self.trail.append(DRAW_BITS_RECORD) 

493 

494 

495class Examples: 

496 """A lazy collection of ``Example`` objects, derived from 

497 the record of recorded behaviour in ``ExampleRecord``. 

498 

499 Behaves logically as if it were a list of ``Example`` objects, 

500 but actually mostly exists as a compact store of information 

501 for them to reference into. All properties on here are best 

502 understood as the backing storage for ``Example`` and are 

503 described there. 

504 """ 

505 

506 def __init__(self, record: ExampleRecord, blocks: "Blocks") -> None: 

507 self.trail = record.trail 

508 self.ir_nodes = record.ir_nodes 

509 self.labels = record.labels 

510 self.__length = self.trail.count( 

511 STOP_EXAMPLE_DISCARD_RECORD 

512 ) + record.trail.count(STOP_EXAMPLE_NO_DISCARD_RECORD) 

513 self.blocks = blocks 

514 self.__children: "Optional[List[Sequence[int]]]" = None 

515 

516 class _starts_and_ends(ExampleProperty): 

517 def begin(self): 

518 self.starts = IntList.of_length(len(self.examples)) 

519 self.ends = IntList.of_length(len(self.examples)) 

520 

521 def start_example(self, i: int, label_index: int) -> None: 

522 self.starts[i] = self.bytes_read 

523 

524 def stop_example(self, i: int, *, discarded: bool) -> None: 

525 self.ends[i] = self.bytes_read 

526 

527 def finish(self) -> Tuple[IntList, IntList]: 

528 return (self.starts, self.ends) 

529 

530 starts_and_ends: "Tuple[IntList, IntList]" = calculated_example_property( 

531 _starts_and_ends 

532 ) 

533 

534 @property 

535 def starts(self) -> IntList: 

536 return self.starts_and_ends[0] 

537 

538 @property 

539 def ends(self) -> IntList: 

540 return self.starts_and_ends[1] 

541 

542 class _ir_starts_and_ends(ExampleProperty): 

543 def begin(self): 

544 self.starts = IntList.of_length(len(self.examples)) 

545 self.ends = IntList.of_length(len(self.examples)) 

546 

547 def start_example(self, i: int, label_index: int) -> None: 

548 self.starts[i] = self.ir_node_count 

549 

550 def stop_example(self, i: int, *, discarded: bool) -> None: 

551 self.ends[i] = self.ir_node_count 

552 

553 def finish(self) -> Tuple[IntList, IntList]: 

554 return (self.starts, self.ends) 

555 

556 ir_starts_and_ends: "Tuple[IntList, IntList]" = calculated_example_property( 

557 _ir_starts_and_ends 

558 ) 

559 

560 @property 

561 def ir_starts(self) -> IntList: 

562 return self.ir_starts_and_ends[0] 

563 

564 @property 

565 def ir_ends(self) -> IntList: 

566 return self.ir_starts_and_ends[1] 

567 

568 class _discarded(ExampleProperty): 

569 def begin(self) -> None: 

570 self.result: "Set[int]" = set() # type: ignore # IntList in parent class 

571 

572 def finish(self) -> FrozenSet[int]: 

573 return frozenset(self.result) 

574 

575 def stop_example(self, i: int, *, discarded: bool) -> None: 

576 if discarded: 

577 self.result.add(i) 

578 

579 discarded: FrozenSet[int] = calculated_example_property(_discarded) 

580 

581 class _trivial(ExampleProperty): 

582 def begin(self) -> None: 

583 self.nontrivial = IntList.of_length(len(self.examples)) 

584 self.result: "Set[int]" = set() # type: ignore # IntList in parent class 

585 

586 def block(self, i: int) -> None: 

587 if not self.examples.blocks.trivial(i): 

588 self.nontrivial[self.example_stack[-1]] = 1 

589 

590 def stop_example(self, i: int, *, discarded: bool) -> None: 

591 if self.nontrivial[i]: 

592 if self.example_stack: 

593 self.nontrivial[self.example_stack[-1]] = 1 

594 else: 

595 self.result.add(i) 

596 

597 def finish(self) -> FrozenSet[int]: 

598 return frozenset(self.result) 

599 

600 trivial: FrozenSet[int] = calculated_example_property(_trivial) 

601 

602 class _parentage(ExampleProperty): 

603 def stop_example(self, i: int, *, discarded: bool) -> None: 

604 if i > 0: 

605 self.result[i] = self.example_stack[-1] 

606 

607 parentage: IntList = calculated_example_property(_parentage) 

608 

609 class _depths(ExampleProperty): 

610 def begin(self): 

611 self.result = IntList.of_length(len(self.examples)) 

612 

613 def start_example(self, i: int, label_index: int) -> None: 

614 self.result[i] = len(self.example_stack) 

615 

616 depths: IntList = calculated_example_property(_depths) 

617 

618 class _ir_tree_nodes(ExampleProperty): 

619 def begin(self): 

620 self.result = [] 

621 

622 def ir_node(self, ir_node): 

623 self.result.append(ir_node) 

624 

625 ir_tree_nodes: "List[IRNode]" = calculated_example_property(_ir_tree_nodes) 

626 

627 class _label_indices(ExampleProperty): 

628 def start_example(self, i: int, label_index: int) -> None: 

629 self.result[i] = label_index 

630 

631 label_indices: IntList = calculated_example_property(_label_indices) 

632 

633 class _mutator_groups(ExampleProperty): 

634 def begin(self) -> None: 

635 self.groups: "Dict[int, Set[Tuple[int, int]]]" = defaultdict(set) 

636 

637 def start_example(self, i: int, label_index: int) -> None: 

638 # TODO should we discard start == end cases? occurs for eg st.data() 

639 # which is conditionally or never drawn from. arguably swapping 

640 # nodes with the empty list is a useful mutation enabled by start == end? 

641 key = (self.examples[i].ir_start, self.examples[i].ir_end) 

642 self.groups[label_index].add(key) 

643 

644 def finish(self) -> Iterable[Set[Tuple[int, int]]]: 

645 # Discard groups with only one example, since the mutator can't 

646 # do anything useful with them. 

647 return [g for g in self.groups.values() if len(g) >= 2] 

648 

649 mutator_groups: List[Set[Tuple[int, int]]] = calculated_example_property( 

650 _mutator_groups 

651 ) 

652 

653 @property 

654 def children(self) -> List[Sequence[int]]: 

655 if self.__children is None: 

656 children = [IntList() for _ in range(len(self))] 

657 for i, p in enumerate(self.parentage): 

658 if i > 0: 

659 children[p].append(i) 

660 # Replace empty children lists with a tuple to reduce 

661 # memory usage. 

662 for i, c in enumerate(children): 

663 if not c: 

664 children[i] = () # type: ignore 

665 self.__children = children # type: ignore 

666 return self.__children # type: ignore 

667 

668 def __len__(self) -> int: 

669 return self.__length 

670 

671 def __getitem__(self, i: int) -> Example: 

672 assert isinstance(i, int) 

673 n = len(self) 

674 if i < -n or i >= n: 

675 raise IndexError(f"Index {i} out of range [-{n}, {n})") 

676 if i < 0: 

677 i += n 

678 return Example(self, i) 

679 

680 

681@dataclass_transform() 

682@attr.s(slots=True, frozen=True) 

683class Block: 

684 """Blocks track the flat list of lowest-level draws from the byte stream, 

685 within a single test run. 

686 

687 Block-tracking allows the shrinker to try "low-level" 

688 transformations, such as minimizing the numeric value of an 

689 individual call to ``draw_bits``. 

690 """ 

691 

692 start: int = attr.ib() 

693 end: int = attr.ib() 

694 

695 # Index of this block inside the overall list of blocks. 

696 index: int = attr.ib() 

697 

698 # True if this block's byte values were forced by a write operation. 

699 # As long as the bytes before this block remain the same, modifying this 

700 # block's bytes will have no effect. 

701 forced: bool = attr.ib(repr=False) 

702 

703 # True if this block's byte values are all 0. Reading this flag can be 

704 # more convenient than explicitly checking a slice for non-zero bytes. 

705 all_zero: bool = attr.ib(repr=False) 

706 

707 @property 

708 def bounds(self) -> Tuple[int, int]: 

709 return (self.start, self.end) 

710 

711 @property 

712 def length(self) -> int: 

713 return self.end - self.start 

714 

715 @property 

716 def trivial(self) -> bool: 

717 return self.forced or self.all_zero 

718 

719 

720class Blocks: 

721 """A lazily calculated list of blocks for a particular ``ConjectureResult`` 

722 or ``ConjectureData`` object. 

723 

724 Pretends to be a list containing ``Block`` objects but actually only 

725 contains their endpoints right up until the point where you want to 

726 access the actual block, at which point it is constructed. 

727 

728 This is designed to be as space efficient as possible, so will at 

729 various points silently transform its representation into one 

730 that is better suited for the current access pattern. 

731 

732 In addition, it has a number of convenience methods for accessing 

733 properties of the block object at index ``i`` that should generally 

734 be preferred to using the Block objects directly, as it will not 

735 have to allocate the actual object.""" 

736 

737 __slots__ = ("endpoints", "owner", "__blocks", "__count", "__sparse") 

738 owner: "Union[ConjectureData, ConjectureResult, None]" 

739 __blocks: Union[Dict[int, Block], List[Optional[Block]]] 

740 

741 def __init__(self, owner: "ConjectureData") -> None: 

742 self.owner = owner 

743 self.endpoints = IntList() 

744 self.__blocks = {} 

745 self.__count = 0 

746 self.__sparse = True 

747 

748 def add_endpoint(self, n: int) -> None: 

749 """Add n to the list of endpoints.""" 

750 assert isinstance(self.owner, ConjectureData) 

751 self.endpoints.append(n) 

752 

753 def transfer_ownership(self, new_owner: "ConjectureResult") -> None: 

754 """Used to move ``Blocks`` over to a ``ConjectureResult`` object 

755 when that is read to be used and we no longer want to keep the 

756 whole ``ConjectureData`` around.""" 

757 assert isinstance(new_owner, ConjectureResult) 

758 self.owner = new_owner 

759 self.__check_completion() 

760 

761 def start(self, i: int) -> int: 

762 """Equivalent to self[i].start.""" 

763 i = self._check_index(i) 

764 

765 if i == 0: 

766 return 0 

767 else: 

768 return self.end(i - 1) 

769 

770 def end(self, i: int) -> int: 

771 """Equivalent to self[i].end.""" 

772 return self.endpoints[i] 

773 

774 def all_bounds(self) -> Iterable[Tuple[int, int]]: 

775 """Equivalent to [(b.start, b.end) for b in self].""" 

776 prev = 0 

777 for e in self.endpoints: 

778 yield (prev, e) 

779 prev = e 

780 

781 @property 

782 def last_block_length(self): 

783 return self.end(-1) - self.start(-1) 

784 

785 def __len__(self) -> int: 

786 return len(self.endpoints) 

787 

788 def __known_block(self, i: int) -> Optional[Block]: 

789 try: 

790 return self.__blocks[i] 

791 except (KeyError, IndexError): 

792 return None 

793 

794 def trivial(self, i: int) -> Any: 

795 """Equivalent to self.blocks[i].trivial.""" 

796 if self.owner is not None: 

797 return self.start(i) in self.owner.forced_indices or not any( 

798 self.owner.buffer[self.start(i) : self.end(i)] 

799 ) 

800 else: 

801 return self[i].trivial 

802 

803 def _check_index(self, i: int) -> int: 

804 n = len(self) 

805 if i < -n or i >= n: 

806 raise IndexError(f"Index {i} out of range [-{n}, {n})") 

807 if i < 0: 

808 i += n 

809 return i 

810 

811 def __getitem__(self, i: int) -> Block: 

812 i = self._check_index(i) 

813 assert i >= 0 

814 result = self.__known_block(i) 

815 if result is not None: 

816 return result 

817 

818 # We store the blocks as a sparse dict mapping indices to the 

819 # actual result, but this isn't the best representation once we 

820 # stop being sparse and want to use most of the blocks. Switch 

821 # over to a list at that point. 

822 if self.__sparse and len(self.__blocks) * 2 >= len(self): 

823 new_blocks: "List[Optional[Block]]" = [None] * len(self) 

824 assert isinstance(self.__blocks, dict) 

825 for k, v in self.__blocks.items(): 

826 new_blocks[k] = v 

827 self.__sparse = False 

828 self.__blocks = new_blocks 

829 assert self.__blocks[i] is None 

830 

831 start = self.start(i) 

832 end = self.end(i) 

833 

834 # We keep track of the number of blocks that have actually been 

835 # instantiated so that when every block that could be instantiated 

836 # has been we know that the list is complete and can throw away 

837 # some data that we no longer need. 

838 self.__count += 1 

839 

840 # Integrity check: We can't have allocated more blocks than we have 

841 # positions for blocks. 

842 assert self.__count <= len(self) 

843 assert self.owner is not None 

844 result = Block( 

845 start=start, 

846 end=end, 

847 index=i, 

848 forced=start in self.owner.forced_indices, 

849 all_zero=not any(self.owner.buffer[start:end]), 

850 ) 

851 try: 

852 self.__blocks[i] = result 

853 except IndexError: 

854 assert isinstance(self.__blocks, list) 

855 assert len(self.__blocks) < len(self) 

856 self.__blocks.extend([None] * (len(self) - len(self.__blocks))) 

857 self.__blocks[i] = result 

858 

859 self.__check_completion() 

860 

861 return result 

862 

863 def __check_completion(self): 

864 """The list of blocks is complete if we have created every ``Block`` 

865 object that we currently good and know that no more will be created. 

866 

867 If this happens then we don't need to keep the reference to the 

868 owner around, and delete it so that there is no circular reference. 

869 The main benefit of this is that the gc doesn't need to run to collect 

870 this because normal reference counting is enough. 

871 """ 

872 if self.__count == len(self) and isinstance(self.owner, ConjectureResult): 

873 self.owner = None 

874 

875 def __iter__(self) -> Iterator[Block]: 

876 for i in range(len(self)): 

877 yield self[i] 

878 

879 def __repr__(self) -> str: 

880 parts: "List[str]" = [] 

881 for i in range(len(self)): 

882 b = self.__known_block(i) 

883 if b is None: 

884 parts.append("...") 

885 else: 

886 parts.append(repr(b)) 

887 return "Block([{}])".format(", ".join(parts)) 

888 

889 

890class _Overrun: 

891 status = Status.OVERRUN 

892 

893 def __repr__(self): 

894 return "Overrun" 

895 

896 

897Overrun = _Overrun() 

898 

899global_test_counter = 0 

900 

901 

902MAX_DEPTH = 100 

903 

904 

905class DataObserver: 

906 """Observer class for recording the behaviour of a 

907 ConjectureData object, primarily used for tracking 

908 the behaviour in the tree cache.""" 

909 

910 def conclude_test( 

911 self, 

912 status: Status, 

913 interesting_origin: Optional[InterestingOrigin], 

914 ) -> None: 

915 """Called when ``conclude_test`` is called on the 

916 observed ``ConjectureData``, with the same arguments. 

917 

918 Note that this is called after ``freeze`` has completed. 

919 """ 

920 

921 def kill_branch(self) -> None: 

922 """Mark this part of the tree as not worth re-exploring.""" 

923 

924 def draw_integer( 

925 self, value: int, *, kwargs: IntegerKWargs, was_forced: bool 

926 ) -> None: 

927 pass 

928 

929 def draw_float( 

930 self, value: float, *, kwargs: FloatKWargs, was_forced: bool 

931 ) -> None: 

932 pass 

933 

934 def draw_string( 

935 self, value: str, *, kwargs: StringKWargs, was_forced: bool 

936 ) -> None: 

937 pass 

938 

939 def draw_bytes( 

940 self, value: bytes, *, kwargs: BytesKWargs, was_forced: bool 

941 ) -> None: 

942 pass 

943 

944 def draw_boolean( 

945 self, value: bool, *, kwargs: BooleanKWargs, was_forced: bool 

946 ) -> None: 

947 pass 

948 

949 def mark_invalid(self, invalid_at: InvalidAt) -> None: 

950 pass 

951 

952 

953@attr.s(slots=True, repr=False, eq=False) 

954class IRNode: 

955 ir_type: IRTypeName = attr.ib() 

956 value: IRType = attr.ib() 

957 kwargs: IRKWargsType = attr.ib() 

958 was_forced: bool = attr.ib() 

959 index: Optional[int] = attr.ib(default=None) 

960 

961 def copy( 

962 self, 

963 *, 

964 with_value: Optional[IRType] = None, 

965 with_kwargs: Optional[IRKWargsType] = None, 

966 ) -> "IRNode": 

967 # we may want to allow this combination in the future, but for now it's 

968 # a footgun. 

969 assert not self.was_forced, "modifying a forced node doesn't make sense" 

970 # explicitly not copying index. node indices are only assigned via 

971 # ExampleRecord. This prevents footguns with relying on stale indices 

972 # after copying. 

973 return IRNode( 

974 ir_type=self.ir_type, 

975 value=self.value if with_value is None else with_value, 

976 kwargs=self.kwargs if with_kwargs is None else with_kwargs, 

977 was_forced=self.was_forced, 

978 ) 

979 

980 @property 

981 def trivial(self): 

982 """ 

983 A node is trivial if it cannot be simplified any further. This does not 

984 mean that modifying a trivial node can't produce simpler test cases when 

985 viewing the tree as a whole. Just that when viewing this node in 

986 isolation, this is the simplest the node can get. 

987 """ 

988 if self.was_forced: 

989 return True 

990 

991 if self.ir_type == "integer": 

992 shrink_towards = self.kwargs["shrink_towards"] 

993 min_value = self.kwargs["min_value"] 

994 max_value = self.kwargs["max_value"] 

995 

996 if min_value is not None: 

997 shrink_towards = max(min_value, shrink_towards) 

998 if max_value is not None: 

999 shrink_towards = min(max_value, shrink_towards) 

1000 

1001 return self.value == shrink_towards 

1002 if self.ir_type == "float": 

1003 min_value = self.kwargs["min_value"] 

1004 max_value = self.kwargs["max_value"] 

1005 shrink_towards = 0 

1006 

1007 if min_value == -math.inf and max_value == math.inf: 

1008 return ir_value_equal("float", self.value, shrink_towards) 

1009 

1010 if ( 

1011 not math.isinf(min_value) 

1012 and not math.isinf(max_value) 

1013 and math.ceil(min_value) <= math.floor(max_value) 

1014 ): 

1015 # the interval contains an integer. the simplest integer is the 

1016 # one closest to shrink_towards 

1017 shrink_towards = max(math.ceil(min_value), shrink_towards) 

1018 shrink_towards = min(math.floor(max_value), shrink_towards) 

1019 return ir_value_equal("float", self.value, shrink_towards) 

1020 

1021 # the real answer here is "the value in [min_value, max_value] with 

1022 # the lowest denominator when represented as a fraction". 

1023 # It would be good to compute this correctly in the future, but it's 

1024 # also not incorrect to be conservative here. 

1025 return False 

1026 if self.ir_type == "boolean": 

1027 return self.value is False 

1028 if self.ir_type == "string": 

1029 # smallest size and contains only the smallest-in-shrink-order character. 

1030 minimal_char = self.kwargs["intervals"].char_in_shrink_order(0) 

1031 return self.value == (minimal_char * self.kwargs["min_size"]) 

1032 if self.ir_type == "bytes": 

1033 # smallest size and all-zero value. 

1034 return len(self.value) == self.kwargs["size"] and not any(self.value) 

1035 

1036 raise NotImplementedError(f"unhandled ir_type {self.ir_type}") 

1037 

1038 def __eq__(self, other): 

1039 if not isinstance(other, IRNode): 

1040 return NotImplemented 

1041 

1042 return ( 

1043 self.ir_type == other.ir_type 

1044 and ir_value_equal(self.ir_type, self.value, other.value) 

1045 and ir_kwargs_equal(self.ir_type, self.kwargs, other.kwargs) 

1046 and self.was_forced == other.was_forced 

1047 ) 

1048 

1049 def __hash__(self): 

1050 return hash( 

1051 ( 

1052 self.ir_type, 

1053 ir_value_key(self.ir_type, self.value), 

1054 ir_kwargs_key(self.ir_type, self.kwargs), 

1055 self.was_forced, 

1056 ) 

1057 ) 

1058 

1059 def __repr__(self): 

1060 # repr to avoid "BytesWarning: str() on a bytes instance" for bytes nodes 

1061 forced_marker = " [forced]" if self.was_forced else "" 

1062 return f"{self.ir_type} {self.value!r}{forced_marker} {self.kwargs!r}" 

1063 

1064 

1065def ir_value_permitted(value, ir_type, kwargs): 

1066 if ir_type == "integer": 

1067 min_value = kwargs["min_value"] 

1068 max_value = kwargs["max_value"] 

1069 shrink_towards = kwargs["shrink_towards"] 

1070 if min_value is not None and value < min_value: 

1071 return False 

1072 if max_value is not None and value > max_value: 

1073 return False 

1074 

1075 if (max_value is None or min_value is None) and ( 

1076 value - shrink_towards 

1077 ).bit_length() >= 128: 

1078 return False 

1079 

1080 return True 

1081 elif ir_type == "float": 

1082 if math.isnan(value): 

1083 return kwargs["allow_nan"] 

1084 return ( 

1085 sign_aware_lte(kwargs["min_value"], value) 

1086 and sign_aware_lte(value, kwargs["max_value"]) 

1087 ) and not (0 < abs(value) < kwargs["smallest_nonzero_magnitude"]) 

1088 elif ir_type == "string": 

1089 if len(value) < kwargs["min_size"]: 

1090 return False 

1091 if kwargs["max_size"] is not None and len(value) > kwargs["max_size"]: 

1092 return False 

1093 return all(ord(c) in kwargs["intervals"] for c in value) 

1094 elif ir_type == "bytes": 

1095 return len(value) == kwargs["size"] 

1096 elif ir_type == "boolean": 

1097 if kwargs["p"] <= 2 ** (-64): 

1098 return value is False 

1099 if kwargs["p"] >= (1 - 2 ** (-64)): 

1100 return value is True 

1101 return True 

1102 

1103 raise NotImplementedError(f"unhandled type {type(value)} of ir value {value}") 

1104 

1105 

1106def ir_value_key(ir_type, v): 

1107 if ir_type == "float": 

1108 return float_to_int(v) 

1109 return v 

1110 

1111 

1112def ir_kwargs_key(ir_type, kwargs): 

1113 if ir_type == "float": 

1114 return ( 

1115 float_to_int(kwargs["min_value"]), 

1116 float_to_int(kwargs["max_value"]), 

1117 kwargs["allow_nan"], 

1118 kwargs["smallest_nonzero_magnitude"], 

1119 ) 

1120 if ir_type == "integer": 

1121 return ( 

1122 kwargs["min_value"], 

1123 kwargs["max_value"], 

1124 None if kwargs["weights"] is None else tuple(kwargs["weights"]), 

1125 kwargs["shrink_towards"], 

1126 ) 

1127 return tuple(kwargs[key] for key in sorted(kwargs)) 

1128 

1129 

1130def ir_value_equal(ir_type, v1, v2): 

1131 return ir_value_key(ir_type, v1) == ir_value_key(ir_type, v2) 

1132 

1133 

1134def ir_kwargs_equal(ir_type, kwargs1, kwargs2): 

1135 return ir_kwargs_key(ir_type, kwargs1) == ir_kwargs_key(ir_type, kwargs2) 

1136 

1137 

1138@dataclass_transform() 

1139@attr.s(slots=True) 

1140class ConjectureResult: 

1141 """Result class storing the parts of ConjectureData that we 

1142 will care about after the original ConjectureData has outlived its 

1143 usefulness.""" 

1144 

1145 status: Status = attr.ib() 

1146 interesting_origin: Optional[InterestingOrigin] = attr.ib() 

1147 buffer: bytes = attr.ib() 

1148 # some ConjectureDatas pass through the ir and some pass through buffers. 

1149 # the ir does not drive its result through the buffer, which means blocks/examples 

1150 # may differ (I think for forced values?) even when the buffer is the same. 

1151 # I don't *think* anything was relying on anything but .buffer for result equality, 

1152 # though that assumption may be leaning on flakiness detection invariants. 

1153 # 

1154 # If we consider blocks or examples in equality checks, multiple semantically equal 

1155 # results get stored in e.g. the pareto front. 

1156 blocks: Blocks = attr.ib(eq=False) 

1157 output: str = attr.ib() 

1158 extra_information: Optional[ExtraInformation] = attr.ib() 

1159 has_discards: bool = attr.ib() 

1160 target_observations: TargetObservations = attr.ib() 

1161 tags: FrozenSet[StructuralCoverageTag] = attr.ib() 

1162 forced_indices: FrozenSet[int] = attr.ib(repr=False) 

1163 examples: Examples = attr.ib(repr=False, eq=False) 

1164 arg_slices: Set[Tuple[int, int]] = attr.ib(repr=False) 

1165 slice_comments: Dict[Tuple[int, int], str] = attr.ib(repr=False) 

1166 invalid_at: Optional[InvalidAt] = attr.ib(repr=False) 

1167 

1168 index: int = attr.ib(init=False) 

1169 

1170 def __attrs_post_init__(self) -> None: 

1171 self.index = len(self.buffer) 

1172 self.forced_indices = frozenset(self.forced_indices) 

1173 

1174 def as_result(self) -> "ConjectureResult": 

1175 return self 

1176 

1177 

1178# Masks for masking off the first byte of an n-bit buffer. 

1179# The appropriate mask is stored at position n % 8. 

1180BYTE_MASKS = [(1 << n) - 1 for n in range(8)] 

1181BYTE_MASKS[0] = 255 

1182 

1183 

1184class PrimitiveProvider(abc.ABC): 

1185 # This is the low-level interface which would also be implemented 

1186 # by e.g. CrossHair, by an Atheris-hypothesis integration, etc. 

1187 # We'd then build the structured tree handling, database and replay 

1188 # support, etc. on top of this - so all backends get those for free. 

1189 # 

1190 # See https://github.com/HypothesisWorks/hypothesis/issues/3086 

1191 

1192 # How long a provider instance is used for. One of test_function or 

1193 # test_case. Defaults to test_function. 

1194 # 

1195 # If test_function, a single provider instance will be instantiated and used 

1196 # for the entirety of each test function. I.e., roughly one provider per 

1197 # @given annotation. This can be useful if you need to track state over many 

1198 # executions to a test function. 

1199 # 

1200 # This lifetime will cause None to be passed for the ConjectureData object 

1201 # in PrimitiveProvider.__init__, because that object is instantiated per 

1202 # test case. 

1203 # 

1204 # If test_case, a new provider instance will be instantiated and used each 

1205 # time hypothesis tries to generate a new input to the test function. This 

1206 # lifetime can access the passed ConjectureData object. 

1207 # 

1208 # Non-hypothesis providers probably want to set a lifetime of test_function. 

1209 lifetime = "test_function" 

1210 

1211 def __init__(self, conjecturedata: Optional["ConjectureData"], /) -> None: 

1212 self._cd = conjecturedata 

1213 

1214 def post_test_case_hook(self, value): 

1215 # hook for providers to modify values returned by draw_* after a full 

1216 # test case concludes. Originally exposed for crosshair to reify its 

1217 # symbolic values into actual values. 

1218 # I'm not tied to this exact function name or design. 

1219 return value 

1220 

1221 def per_test_case_context_manager(self): 

1222 return contextlib.nullcontext() 

1223 

1224 @abc.abstractmethod 

1225 def draw_boolean( 

1226 self, 

1227 p: float = 0.5, 

1228 *, 

1229 forced: Optional[bool] = None, 

1230 fake_forced: bool = False, 

1231 ) -> bool: 

1232 raise NotImplementedError 

1233 

1234 @abc.abstractmethod 

1235 def draw_integer( 

1236 self, 

1237 min_value: Optional[int] = None, 

1238 max_value: Optional[int] = None, 

1239 *, 

1240 # weights are for choosing an element index from a bounded range 

1241 weights: Optional[Sequence[float]] = None, 

1242 shrink_towards: int = 0, 

1243 forced: Optional[int] = None, 

1244 fake_forced: bool = False, 

1245 ) -> int: 

1246 raise NotImplementedError 

1247 

1248 @abc.abstractmethod 

1249 def draw_float( 

1250 self, 

1251 *, 

1252 min_value: float = -math.inf, 

1253 max_value: float = math.inf, 

1254 allow_nan: bool = True, 

1255 smallest_nonzero_magnitude: float, 

1256 # TODO: consider supporting these float widths at the IR level in the 

1257 # future. 

1258 # width: Literal[16, 32, 64] = 64, 

1259 # exclude_min and exclude_max handled higher up, 

1260 forced: Optional[float] = None, 

1261 fake_forced: bool = False, 

1262 ) -> float: 

1263 raise NotImplementedError 

1264 

1265 @abc.abstractmethod 

1266 def draw_string( 

1267 self, 

1268 intervals: IntervalSet, 

1269 *, 

1270 min_size: int = 0, 

1271 max_size: Optional[int] = None, 

1272 forced: Optional[str] = None, 

1273 fake_forced: bool = False, 

1274 ) -> str: 

1275 raise NotImplementedError 

1276 

1277 @abc.abstractmethod 

1278 def draw_bytes( 

1279 self, size: int, *, forced: Optional[bytes] = None, fake_forced: bool = False 

1280 ) -> bytes: 

1281 raise NotImplementedError 

1282 

1283 

1284class HypothesisProvider(PrimitiveProvider): 

1285 lifetime = "test_case" 

1286 

1287 def __init__(self, conjecturedata: Optional["ConjectureData"], /): 

1288 assert conjecturedata is not None 

1289 super().__init__(conjecturedata) 

1290 

1291 def draw_boolean( 

1292 self, 

1293 p: float = 0.5, 

1294 *, 

1295 forced: Optional[bool] = None, 

1296 fake_forced: bool = False, 

1297 ) -> bool: 

1298 """Return True with probability p (assuming a uniform generator), 

1299 shrinking towards False. If ``forced`` is set to a non-None value, this 

1300 will always return that value but will write choices appropriate to having 

1301 drawn that value randomly.""" 

1302 # Note that this could also be implemented in terms of draw_integer(). 

1303 

1304 assert self._cd is not None 

1305 # NB this function is vastly more complicated than it may seem reasonable 

1306 # for it to be. This is because it is used in a lot of places and it's 

1307 # important for it to shrink well, so it's worth the engineering effort. 

1308 

1309 if p <= 0 or p >= 1: 

1310 bits = 1 

1311 else: 

1312 # When there is a meaningful draw, in order to shrink well we will 

1313 # set things up so that 0 and 1 always correspond to False and True 

1314 # respectively. This means we want enough bits available that in a 

1315 # draw we will always have at least one truthy value and one falsey 

1316 # value. 

1317 bits = math.ceil(-math.log(min(p, 1 - p), 2)) 

1318 # In order to avoid stupidly large draws where the probability is 

1319 # effectively zero or one, we treat probabilities of under 2^-64 to be 

1320 # effectively zero. 

1321 if bits > 64: 

1322 # There isn't enough precision near one for this to occur for values 

1323 # far from 0. 

1324 p = 0.0 

1325 bits = 1 

1326 

1327 size = 2**bits 

1328 

1329 while True: 

1330 # The logic here is a bit complicated and special cased to make it 

1331 # play better with the shrinker. 

1332 

1333 # We imagine partitioning the real interval [0, 1] into 2**n equal parts 

1334 # and looking at each part and whether its interior is wholly <= p 

1335 # or wholly >= p. At most one part can be neither. 

1336 

1337 # We then pick a random part. If it's wholly on one side or the other 

1338 # of p then we use that as the answer. If p is contained in the 

1339 # interval then we start again with a new probability that is given 

1340 # by the fraction of that interval that was <= our previous p. 

1341 

1342 # We then take advantage of the fact that we have control of the 

1343 # labelling to make this shrink better, using the following tricks: 

1344 

1345 # If p is <= 0 or >= 1 the result of this coin is certain. We make sure 

1346 # to write a byte to the data stream anyway so that these don't cause 

1347 # difficulties when shrinking. 

1348 if p <= 0: 

1349 self._cd.draw_bits(1, forced=0) 

1350 result = False 

1351 elif p >= 1: 

1352 self._cd.draw_bits(1, forced=1) 

1353 result = True 

1354 else: 

1355 falsey = floor(size * (1 - p)) 

1356 truthy = floor(size * p) 

1357 remainder = size * p - truthy 

1358 

1359 if falsey + truthy == size: 

1360 partial = False 

1361 else: 

1362 partial = True 

1363 

1364 i = self._cd.draw_bits( 

1365 bits, 

1366 forced=None if forced is None else int(forced), 

1367 fake_forced=fake_forced, 

1368 ) 

1369 

1370 # We always choose the region that causes us to repeat the loop as 

1371 # the maximum value, so that shrinking the drawn bits never causes 

1372 # us to need to draw more self._cd. 

1373 if partial and i == size - 1: 

1374 p = remainder 

1375 continue 

1376 if falsey == 0: 

1377 # Every other partition is truthy, so the result is true 

1378 result = True 

1379 elif truthy == 0: 

1380 # Every other partition is falsey, so the result is false 

1381 result = False 

1382 elif i <= 1: 

1383 # We special case so that zero is always false and 1 is always 

1384 # true which makes shrinking easier because we can always 

1385 # replace a truthy block with 1. This has the slightly weird 

1386 # property that shrinking from 2 to 1 can cause the result to 

1387 # grow, but the shrinker always tries 0 and 1 first anyway, so 

1388 # this will usually be fine. 

1389 result = bool(i) 

1390 else: 

1391 # Originally everything in the region 0 <= i < falsey was false 

1392 # and everything above was true. We swapped one truthy element 

1393 # into this region, so the region becomes 0 <= i <= falsey 

1394 # except for i = 1. We know i > 1 here, so the test for truth 

1395 # becomes i > falsey. 

1396 result = i > falsey 

1397 

1398 break 

1399 return result 

1400 

1401 def draw_integer( 

1402 self, 

1403 min_value: Optional[int] = None, 

1404 max_value: Optional[int] = None, 

1405 *, 

1406 # weights are for choosing an element index from a bounded range 

1407 weights: Optional[Sequence[float]] = None, 

1408 shrink_towards: int = 0, 

1409 forced: Optional[int] = None, 

1410 fake_forced: bool = False, 

1411 ) -> int: 

1412 assert self._cd is not None 

1413 

1414 if min_value is not None: 

1415 shrink_towards = max(min_value, shrink_towards) 

1416 if max_value is not None: 

1417 shrink_towards = min(max_value, shrink_towards) 

1418 

1419 # This is easy to build on top of our existing conjecture utils, 

1420 # and it's easy to build sampled_from and weighted_coin on this. 

1421 if weights is not None: 

1422 assert min_value is not None 

1423 assert max_value is not None 

1424 

1425 sampler = Sampler(weights, observe=False) 

1426 gap = max_value - shrink_towards 

1427 

1428 forced_idx = None 

1429 if forced is not None: 

1430 if forced >= shrink_towards: 

1431 forced_idx = forced - shrink_towards 

1432 else: 

1433 forced_idx = shrink_towards + gap - forced 

1434 idx = sampler.sample(self._cd, forced=forced_idx, fake_forced=fake_forced) 

1435 

1436 # For range -2..2, interpret idx = 0..4 as [0, 1, 2, -1, -2] 

1437 if idx <= gap: 

1438 return shrink_towards + idx 

1439 else: 

1440 return shrink_towards - (idx - gap) 

1441 

1442 if min_value is None and max_value is None: 

1443 return self._draw_unbounded_integer(forced=forced, fake_forced=fake_forced) 

1444 

1445 if min_value is None: 

1446 assert max_value is not None # make mypy happy 

1447 probe = max_value + 1 

1448 while max_value < probe: 

1449 probe = shrink_towards + self._draw_unbounded_integer( 

1450 forced=None if forced is None else forced - shrink_towards, 

1451 fake_forced=fake_forced, 

1452 ) 

1453 return probe 

1454 

1455 if max_value is None: 

1456 assert min_value is not None 

1457 probe = min_value - 1 

1458 while probe < min_value: 

1459 probe = shrink_towards + self._draw_unbounded_integer( 

1460 forced=None if forced is None else forced - shrink_towards, 

1461 fake_forced=fake_forced, 

1462 ) 

1463 return probe 

1464 

1465 return self._draw_bounded_integer( 

1466 min_value, 

1467 max_value, 

1468 center=shrink_towards, 

1469 forced=forced, 

1470 fake_forced=fake_forced, 

1471 ) 

1472 

1473 def draw_float( 

1474 self, 

1475 *, 

1476 min_value: float = -math.inf, 

1477 max_value: float = math.inf, 

1478 allow_nan: bool = True, 

1479 smallest_nonzero_magnitude: float, 

1480 # TODO: consider supporting these float widths at the IR level in the 

1481 # future. 

1482 # width: Literal[16, 32, 64] = 64, 

1483 # exclude_min and exclude_max handled higher up, 

1484 forced: Optional[float] = None, 

1485 fake_forced: bool = False, 

1486 ) -> float: 

1487 ( 

1488 sampler, 

1489 forced_sign_bit, 

1490 neg_clamper, 

1491 pos_clamper, 

1492 nasty_floats, 

1493 ) = self._draw_float_init_logic( 

1494 min_value=min_value, 

1495 max_value=max_value, 

1496 allow_nan=allow_nan, 

1497 smallest_nonzero_magnitude=smallest_nonzero_magnitude, 

1498 ) 

1499 

1500 assert self._cd is not None 

1501 

1502 while True: 

1503 # If `forced in nasty_floats`, then `forced` was *probably* 

1504 # generated by drawing a nonzero index from the sampler. However, we 

1505 # have no obligation to generate it that way when forcing. In particular, 

1506 # i == 0 is able to produce all possible floats, and the forcing 

1507 # logic is simpler if we assume this choice. 

1508 forced_i = None if forced is None else 0 

1509 i = ( 

1510 sampler.sample(self._cd, forced=forced_i, fake_forced=fake_forced) 

1511 if sampler 

1512 else 0 

1513 ) 

1514 if i == 0: 

1515 result = self._draw_float( 

1516 forced_sign_bit=forced_sign_bit, 

1517 forced=forced, 

1518 fake_forced=fake_forced, 

1519 ) 

1520 if allow_nan and math.isnan(result): 

1521 clamped = result 

1522 elif math.copysign(1.0, result) == -1: 

1523 assert neg_clamper is not None 

1524 clamped = -neg_clamper(-result) 

1525 else: 

1526 assert pos_clamper is not None 

1527 clamped = pos_clamper(result) 

1528 if clamped != result and not (math.isnan(result) and allow_nan): 

1529 self._draw_float(forced=clamped, fake_forced=fake_forced) 

1530 result = clamped 

1531 else: 

1532 result = nasty_floats[i - 1] 

1533 # nan values generated via int_to_float break list membership: 

1534 # 

1535 # >>> n = 18444492273895866368 

1536 # >>> assert math.isnan(int_to_float(n)) 

1537 # >>> assert int_to_float(n) not in [int_to_float(n)] 

1538 # 

1539 # because int_to_float nans are not equal in the sense of either 

1540 # `a == b` or `a is b`. 

1541 # 

1542 # This can lead to flaky errors when collections require unique 

1543 # floats. I think what is happening is that in some places we 

1544 # provide math.nan, and in others we provide 

1545 # int_to_float(float_to_int(math.nan)), and which one gets used 

1546 # is not deterministic across test iterations. 

1547 # 

1548 # As a (temporary?) fix, we'll *always* generate nan values which 

1549 # are not equal in the identity sense. 

1550 # 

1551 # see also https://github.com/HypothesisWorks/hypothesis/issues/3926. 

1552 if math.isnan(result): 

1553 result = int_to_float(float_to_int(result)) 

1554 

1555 self._draw_float(forced=result, fake_forced=fake_forced) 

1556 

1557 return result 

1558 

1559 def draw_string( 

1560 self, 

1561 intervals: IntervalSet, 

1562 *, 

1563 min_size: int = 0, 

1564 max_size: Optional[int] = None, 

1565 forced: Optional[str] = None, 

1566 fake_forced: bool = False, 

1567 ) -> str: 

1568 if max_size is None: 

1569 max_size = DRAW_STRING_DEFAULT_MAX_SIZE 

1570 

1571 assert forced is None or min_size <= len(forced) <= max_size 

1572 assert self._cd is not None 

1573 

1574 average_size = min( 

1575 max(min_size * 2, min_size + 5), 

1576 0.5 * (min_size + max_size), 

1577 ) 

1578 

1579 chars = [] 

1580 elements = many( 

1581 self._cd, 

1582 min_size=min_size, 

1583 max_size=max_size, 

1584 average_size=average_size, 

1585 forced=None if forced is None else len(forced), 

1586 fake_forced=fake_forced, 

1587 observe=False, 

1588 ) 

1589 while elements.more(): 

1590 forced_i: Optional[int] = None 

1591 if forced is not None: 

1592 c = forced[elements.count - 1] 

1593 forced_i = intervals.index_from_char_in_shrink_order(c) 

1594 

1595 if len(intervals) > 256: 

1596 if self.draw_boolean( 

1597 0.2, 

1598 forced=None if forced_i is None else forced_i > 255, 

1599 fake_forced=fake_forced, 

1600 ): 

1601 i = self._draw_bounded_integer( 

1602 256, 

1603 len(intervals) - 1, 

1604 forced=forced_i, 

1605 fake_forced=fake_forced, 

1606 ) 

1607 else: 

1608 i = self._draw_bounded_integer( 

1609 0, 255, forced=forced_i, fake_forced=fake_forced 

1610 ) 

1611 else: 

1612 i = self._draw_bounded_integer( 

1613 0, len(intervals) - 1, forced=forced_i, fake_forced=fake_forced 

1614 ) 

1615 

1616 chars.append(intervals.char_in_shrink_order(i)) 

1617 

1618 return "".join(chars) 

1619 

1620 def draw_bytes( 

1621 self, size: int, *, forced: Optional[bytes] = None, fake_forced: bool = False 

1622 ) -> bytes: 

1623 forced_i = None 

1624 if forced is not None: 

1625 forced_i = int_from_bytes(forced) 

1626 size = len(forced) 

1627 

1628 assert self._cd is not None 

1629 return self._cd.draw_bits( 

1630 8 * size, forced=forced_i, fake_forced=fake_forced 

1631 ).to_bytes(size, "big") 

1632 

1633 def _draw_float( 

1634 self, 

1635 forced_sign_bit: Optional[int] = None, 

1636 *, 

1637 forced: Optional[float] = None, 

1638 fake_forced: bool = False, 

1639 ) -> float: 

1640 """ 

1641 Helper for draw_float which draws a random 64-bit float. 

1642 """ 

1643 assert self._cd is not None 

1644 

1645 if forced is not None: 

1646 # sign_aware_lte(forced, -0.0) does not correctly handle the 

1647 # math.nan case here. 

1648 forced_sign_bit = math.copysign(1, forced) == -1 

1649 is_negative = self._cd.draw_bits( 

1650 1, forced=forced_sign_bit, fake_forced=fake_forced 

1651 ) 

1652 f = lex_to_float( 

1653 self._cd.draw_bits( 

1654 64, 

1655 forced=None if forced is None else float_to_lex(abs(forced)), 

1656 fake_forced=fake_forced, 

1657 ) 

1658 ) 

1659 return -f if is_negative else f 

1660 

1661 def _draw_unbounded_integer( 

1662 self, *, forced: Optional[int] = None, fake_forced: bool = False 

1663 ) -> int: 

1664 assert self._cd is not None 

1665 forced_i = None 

1666 if forced is not None: 

1667 # Using any bucket large enough to contain this integer would be a 

1668 # valid way to force it. This is because an n bit integer could have 

1669 # been drawn from a bucket of size n, or from any bucket of size 

1670 # m > n. 

1671 # We'll always choose the smallest eligible bucket here. 

1672 

1673 # We need an extra bit to handle forced signed integers. INT_SIZES 

1674 # is interpreted as unsigned sizes. 

1675 bit_size = forced.bit_length() + 1 

1676 size = min(size for size in INT_SIZES if bit_size <= size) 

1677 forced_i = INT_SIZES.index(size) 

1678 

1679 size = INT_SIZES[ 

1680 INT_SIZES_SAMPLER.sample(self._cd, forced=forced_i, fake_forced=fake_forced) 

1681 ] 

1682 

1683 forced_r = None 

1684 if forced is not None: 

1685 forced_r = forced 

1686 forced_r <<= 1 

1687 if forced < 0: 

1688 forced_r = -forced_r 

1689 forced_r |= 1 

1690 

1691 r = self._cd.draw_bits(size, forced=forced_r, fake_forced=fake_forced) 

1692 sign = r & 1 

1693 r >>= 1 

1694 if sign: 

1695 r = -r 

1696 return r 

1697 

1698 def _draw_bounded_integer( 

1699 self, 

1700 lower: int, 

1701 upper: int, 

1702 *, 

1703 center: Optional[int] = None, 

1704 forced: Optional[int] = None, 

1705 fake_forced: bool = False, 

1706 _vary_effective_size: bool = True, 

1707 ) -> int: 

1708 assert lower <= upper 

1709 assert forced is None or lower <= forced <= upper 

1710 assert self._cd is not None 

1711 if lower == upper: 

1712 # Write a value even when this is trivial so that when a bound depends 

1713 # on other values we don't suddenly disappear when the gap shrinks to 

1714 # zero - if that happens then often the data stream becomes misaligned 

1715 # and we fail to shrink in cases where we really should be able to. 

1716 self._cd.draw_bits(1, forced=0) 

1717 return int(lower) 

1718 

1719 if center is None: 

1720 center = lower 

1721 center = min(max(center, lower), upper) 

1722 

1723 if center == upper: 

1724 above = False 

1725 elif center == lower: 

1726 above = True 

1727 else: 

1728 force_above = None if forced is None else forced < center 

1729 above = not self._cd.draw_bits( 

1730 1, forced=force_above, fake_forced=fake_forced 

1731 ) 

1732 

1733 if above: 

1734 gap = upper - center 

1735 else: 

1736 gap = center - lower 

1737 

1738 assert gap > 0 

1739 

1740 bits = gap.bit_length() 

1741 probe = gap + 1 

1742 

1743 if ( 

1744 bits > 24 

1745 and _vary_effective_size 

1746 and self.draw_boolean( 

1747 7 / 8, forced=None if forced is None else False, fake_forced=fake_forced 

1748 ) 

1749 ): 

1750 # For large ranges, we combine the uniform random distribution from draw_bits 

1751 # with a weighting scheme with moderate chance. Cutoff at 2 ** 24 so that our 

1752 # choice of unicode characters is uniform but the 32bit distribution is not. 

1753 idx = INT_SIZES_SAMPLER.sample(self._cd) 

1754 force_bits = min(bits, INT_SIZES[idx]) 

1755 forced = self._draw_bounded_integer( 

1756 lower=center if above else max(lower, center - 2**force_bits - 1), 

1757 upper=center if not above else min(upper, center + 2**force_bits - 1), 

1758 _vary_effective_size=False, 

1759 ) 

1760 

1761 assert lower <= forced <= upper 

1762 

1763 while probe > gap: 

1764 probe = self._cd.draw_bits( 

1765 bits, 

1766 forced=None if forced is None else abs(forced - center), 

1767 fake_forced=fake_forced, 

1768 ) 

1769 

1770 if above: 

1771 result = center + probe 

1772 else: 

1773 result = center - probe 

1774 

1775 assert lower <= result <= upper 

1776 assert forced is None or result == forced, (result, forced, center, above) 

1777 return result 

1778 

1779 @classmethod 

1780 def _draw_float_init_logic( 

1781 cls, 

1782 *, 

1783 min_value: float, 

1784 max_value: float, 

1785 allow_nan: bool, 

1786 smallest_nonzero_magnitude: float, 

1787 ) -> Tuple[ 

1788 Optional[Sampler], 

1789 Optional[Literal[0, 1]], 

1790 Optional[Callable[[float], float]], 

1791 Optional[Callable[[float], float]], 

1792 List[float], 

1793 ]: 

1794 """ 

1795 Caches initialization logic for draw_float, as an alternative to 

1796 computing this for *every* float draw. 

1797 """ 

1798 # float_to_int allows us to distinguish between e.g. -0.0 and 0.0, 

1799 # even in light of hash(-0.0) == hash(0.0) and -0.0 == 0.0. 

1800 key = ( 

1801 float_to_int(min_value), 

1802 float_to_int(max_value), 

1803 allow_nan, 

1804 float_to_int(smallest_nonzero_magnitude), 

1805 ) 

1806 if key in FLOAT_INIT_LOGIC_CACHE: 

1807 return FLOAT_INIT_LOGIC_CACHE[key] 

1808 

1809 result = cls._compute_draw_float_init_logic( 

1810 min_value=min_value, 

1811 max_value=max_value, 

1812 allow_nan=allow_nan, 

1813 smallest_nonzero_magnitude=smallest_nonzero_magnitude, 

1814 ) 

1815 FLOAT_INIT_LOGIC_CACHE[key] = result 

1816 return result 

1817 

1818 @staticmethod 

1819 def _compute_draw_float_init_logic( 

1820 *, 

1821 min_value: float, 

1822 max_value: float, 

1823 allow_nan: bool, 

1824 smallest_nonzero_magnitude: float, 

1825 ) -> Tuple[ 

1826 Optional[Sampler], 

1827 Optional[Literal[0, 1]], 

1828 Optional[Callable[[float], float]], 

1829 Optional[Callable[[float], float]], 

1830 List[float], 

1831 ]: 

1832 if smallest_nonzero_magnitude == 0.0: # pragma: no cover 

1833 raise FloatingPointError( 

1834 "Got allow_subnormal=True, but we can't represent subnormal floats " 

1835 "right now, in violation of the IEEE-754 floating-point " 

1836 "specification. This is usually because something was compiled with " 

1837 "-ffast-math or a similar option, which sets global processor state. " 

1838 "See https://simonbyrne.github.io/notes/fastmath/ for a more detailed " 

1839 "writeup - and good luck!" 

1840 ) 

1841 

1842 def permitted(f): 

1843 assert isinstance(f, float) 

1844 if math.isnan(f): 

1845 return allow_nan 

1846 if 0 < abs(f) < smallest_nonzero_magnitude: 

1847 return False 

1848 return sign_aware_lte(min_value, f) and sign_aware_lte(f, max_value) 

1849 

1850 boundary_values = [ 

1851 min_value, 

1852 next_up(min_value), 

1853 min_value + 1, 

1854 max_value - 1, 

1855 next_down(max_value), 

1856 max_value, 

1857 ] 

1858 nasty_floats = [f for f in NASTY_FLOATS + boundary_values if permitted(f)] 

1859 weights = [0.2 * len(nasty_floats)] + [0.8] * len(nasty_floats) 

1860 sampler = Sampler(weights, observe=False) if nasty_floats else None 

1861 

1862 pos_clamper = neg_clamper = None 

1863 if sign_aware_lte(0.0, max_value): 

1864 pos_min = max(min_value, smallest_nonzero_magnitude) 

1865 allow_zero = sign_aware_lte(min_value, 0.0) 

1866 pos_clamper = make_float_clamper(pos_min, max_value, allow_zero=allow_zero) 

1867 if sign_aware_lte(min_value, -0.0): 

1868 neg_max = min(max_value, -smallest_nonzero_magnitude) 

1869 allow_zero = sign_aware_lte(-0.0, max_value) 

1870 neg_clamper = make_float_clamper( 

1871 -neg_max, -min_value, allow_zero=allow_zero 

1872 ) 

1873 

1874 forced_sign_bit: Optional[Literal[0, 1]] = None 

1875 if (pos_clamper is None) != (neg_clamper is None): 

1876 forced_sign_bit = 1 if neg_clamper else 0 

1877 

1878 return (sampler, forced_sign_bit, neg_clamper, pos_clamper, nasty_floats) 

1879 

1880 

1881# The set of available `PrimitiveProvider`s, by name. Other libraries, such as 

1882# crosshair, can implement this interface and add themselves; at which point users 

1883# can configure which backend to use via settings. Keys are the name of the library, 

1884# which doubles as the backend= setting, and values are importable class names. 

1885# 

1886# NOTE: this is a temporary interface. We DO NOT promise to continue supporting it! 

1887# (but if you want to experiment and don't mind breakage, here you go) 

1888AVAILABLE_PROVIDERS = { 

1889 "hypothesis": "hypothesis.internal.conjecture.data.HypothesisProvider", 

1890} 

1891 

1892 

1893class ConjectureData: 

1894 @classmethod 

1895 def for_buffer( 

1896 cls, 

1897 buffer: Union[List[int], bytes], 

1898 *, 

1899 observer: Optional[DataObserver] = None, 

1900 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

1901 ) -> "ConjectureData": 

1902 return cls( 

1903 len(buffer), buffer, random=None, observer=observer, provider=provider 

1904 ) 

1905 

1906 @classmethod 

1907 def for_ir_tree( 

1908 cls, 

1909 ir_tree_prefix: List[IRNode], 

1910 *, 

1911 observer: Optional[DataObserver] = None, 

1912 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

1913 max_length: Optional[int] = None, 

1914 ) -> "ConjectureData": 

1915 from hypothesis.internal.conjecture.engine import BUFFER_SIZE 

1916 

1917 return cls( 

1918 max_length=BUFFER_SIZE if max_length is None else max_length, 

1919 prefix=b"", 

1920 random=None, 

1921 ir_tree_prefix=ir_tree_prefix, 

1922 observer=observer, 

1923 provider=provider, 

1924 ) 

1925 

1926 def __init__( 

1927 self, 

1928 max_length: int, 

1929 prefix: Union[List[int], bytes, bytearray], 

1930 *, 

1931 random: Optional[Random], 

1932 observer: Optional[DataObserver] = None, 

1933 provider: Union[type, PrimitiveProvider] = HypothesisProvider, 

1934 ir_tree_prefix: Optional[List[IRNode]] = None, 

1935 ) -> None: 

1936 if observer is None: 

1937 observer = DataObserver() 

1938 assert isinstance(observer, DataObserver) 

1939 self._bytes_drawn = 0 

1940 self.observer = observer 

1941 self.max_length = max_length 

1942 self.is_find = False 

1943 self.overdraw = 0 

1944 self.__prefix = bytes(prefix) 

1945 self.__random = random 

1946 

1947 if ir_tree_prefix is None: 

1948 assert random is not None or max_length <= len(prefix) 

1949 

1950 self.blocks = Blocks(self) 

1951 self.buffer: "Union[bytes, bytearray]" = bytearray() 

1952 self.index = 0 

1953 self.output = "" 

1954 self.status = Status.VALID 

1955 self.frozen = False 

1956 global global_test_counter 

1957 self.testcounter = global_test_counter 

1958 global_test_counter += 1 

1959 self.start_time = time.perf_counter() 

1960 self.gc_start_time = gc_cumulative_time() 

1961 self.events: Dict[str, Union[str, int, float]] = {} 

1962 self.forced_indices: "Set[int]" = set() 

1963 self.interesting_origin: Optional[InterestingOrigin] = None 

1964 self.draw_times: "Dict[str, float]" = {} 

1965 self._stateful_run_times: "DefaultDict[str, float]" = defaultdict(float) 

1966 self.max_depth = 0 

1967 self.has_discards = False 

1968 

1969 self.provider = provider(self) if isinstance(provider, type) else provider 

1970 assert isinstance(self.provider, PrimitiveProvider) 

1971 

1972 self.__result: "Optional[ConjectureResult]" = None 

1973 

1974 # Observations used for targeted search. They'll be aggregated in 

1975 # ConjectureRunner.generate_new_examples and fed to TargetSelector. 

1976 self.target_observations: TargetObservations = {} 

1977 

1978 # Tags which indicate something about which part of the search space 

1979 # this example is in. These are used to guide generation. 

1980 self.tags: "Set[StructuralCoverageTag]" = set() 

1981 self.labels_for_structure_stack: "List[Set[int]]" = [] 

1982 

1983 # Normally unpopulated but we need this in the niche case 

1984 # that self.as_result() is Overrun but we still want the 

1985 # examples for reporting purposes. 

1986 self.__examples: "Optional[Examples]" = None 

1987 

1988 # We want the top level example to have depth 0, so we start 

1989 # at -1. 

1990 self.depth = -1 

1991 self.__example_record = ExampleRecord() 

1992 

1993 # Slice indices for discrete reportable parts that which-parts-matter can 

1994 # try varying, to report if the minimal example always fails anyway. 

1995 self.arg_slices: Set[Tuple[int, int]] = set() 

1996 self.slice_comments: Dict[Tuple[int, int], str] = {} 

1997 self._observability_args: Dict[str, Any] = {} 

1998 self._observability_predicates: defaultdict = defaultdict( 

1999 lambda: {"satisfied": 0, "unsatisfied": 0} 

2000 ) 

2001 

2002 self.extra_information = ExtraInformation() 

2003 

2004 self.ir_tree_nodes = ir_tree_prefix 

2005 self.invalid_at: Optional[InvalidAt] = None 

2006 self._node_index = 0 

2007 self.start_example(TOP_LABEL) 

2008 

2009 def __repr__(self): 

2010 return "ConjectureData(%s, %d bytes%s)" % ( 

2011 self.status.name, 

2012 len(self.buffer), 

2013 ", frozen" if self.frozen else "", 

2014 ) 

2015 

2016 # A bit of explanation of the `observe` and `fake_forced` arguments in our 

2017 # draw_* functions. 

2018 # 

2019 # There are two types of draws: sub-ir and super-ir. For instance, some ir 

2020 # nodes use `many`, which in turn calls draw_boolean. But some strategies 

2021 # also use many, at the super-ir level. We don't want to write sub-ir draws 

2022 # to the DataTree (and consequently use them when computing novel prefixes), 

2023 # since they are fully recorded by writing the ir node itself. 

2024 # But super-ir draws are not included in the ir node, so we do want to write 

2025 # these to the tree. 

2026 # 

2027 # `observe` formalizes this distinction. The draw will only be written to 

2028 # the DataTree if observe is True. 

2029 # 

2030 # `fake_forced` deals with a different problem. We use `forced=` to convert 

2031 # ir prefixes, which are potentially from other backends, into our backing 

2032 # bits representation. This works fine, except using `forced=` in this way 

2033 # also sets `was_forced=True` for all blocks, even those that weren't forced 

2034 # in the traditional way. The shrinker chokes on this due to thinking that 

2035 # nothing can be modified. 

2036 # 

2037 # Setting `fake_forced` to true says that yes, we want to force a particular 

2038 # value to be returned, but we don't want to treat that block as fixed for 

2039 # e.g. the shrinker. 

2040 

2041 def draw_integer( 

2042 self, 

2043 min_value: Optional[int] = None, 

2044 max_value: Optional[int] = None, 

2045 *, 

2046 # weights are for choosing an element index from a bounded range 

2047 weights: Optional[Sequence[float]] = None, 

2048 shrink_towards: int = 0, 

2049 forced: Optional[int] = None, 

2050 fake_forced: bool = False, 

2051 observe: bool = True, 

2052 ) -> int: 

2053 # Validate arguments 

2054 if weights is not None: 

2055 assert min_value is not None 

2056 assert max_value is not None 

2057 width = max_value - min_value + 1 

2058 assert width <= 255 # arbitrary practical limit 

2059 assert len(weights) == width 

2060 

2061 if forced is not None and (min_value is None or max_value is None): 

2062 # We draw `forced=forced - shrink_towards` here internally. If that 

2063 # grows larger than a 128 bit signed integer, we can't represent it. 

2064 # Disallow this combination for now. 

2065 # Note that bit_length() = 128 -> signed bit size = 129. 

2066 assert (forced - shrink_towards).bit_length() < 128 

2067 if forced is not None and min_value is not None: 

2068 assert min_value <= forced 

2069 if forced is not None and max_value is not None: 

2070 assert forced <= max_value 

2071 

2072 kwargs: IntegerKWargs = self._pooled_kwargs( 

2073 "integer", 

2074 { 

2075 "min_value": min_value, 

2076 "max_value": max_value, 

2077 "weights": weights, 

2078 "shrink_towards": shrink_towards, 

2079 }, 

2080 ) 

2081 

2082 if self.ir_tree_nodes is not None and observe: 

2083 node = self._pop_ir_tree_node("integer", kwargs, forced=forced) 

2084 if forced is None: 

2085 assert isinstance(node.value, int) 

2086 forced = node.value 

2087 fake_forced = True 

2088 

2089 value = self.provider.draw_integer( 

2090 **kwargs, forced=forced, fake_forced=fake_forced 

2091 ) 

2092 if observe: 

2093 self.observer.draw_integer( 

2094 value, kwargs=kwargs, was_forced=forced is not None and not fake_forced 

2095 ) 

2096 self.__example_record.record_ir_draw( 

2097 "integer", 

2098 value, 

2099 kwargs=kwargs, 

2100 was_forced=forced is not None and not fake_forced, 

2101 ) 

2102 return value 

2103 

2104 def draw_float( 

2105 self, 

2106 min_value: float = -math.inf, 

2107 max_value: float = math.inf, 

2108 *, 

2109 allow_nan: bool = True, 

2110 smallest_nonzero_magnitude: float = SMALLEST_SUBNORMAL, 

2111 # TODO: consider supporting these float widths at the IR level in the 

2112 # future. 

2113 # width: Literal[16, 32, 64] = 64, 

2114 # exclude_min and exclude_max handled higher up, 

2115 forced: Optional[float] = None, 

2116 fake_forced: bool = False, 

2117 observe: bool = True, 

2118 ) -> float: 

2119 assert smallest_nonzero_magnitude > 0 

2120 assert not math.isnan(min_value) 

2121 assert not math.isnan(max_value) 

2122 

2123 if forced is not None: 

2124 assert allow_nan or not math.isnan(forced) 

2125 assert math.isnan(forced) or ( 

2126 sign_aware_lte(min_value, forced) and sign_aware_lte(forced, max_value) 

2127 ) 

2128 

2129 kwargs: FloatKWargs = self._pooled_kwargs( 

2130 "float", 

2131 { 

2132 "min_value": min_value, 

2133 "max_value": max_value, 

2134 "allow_nan": allow_nan, 

2135 "smallest_nonzero_magnitude": smallest_nonzero_magnitude, 

2136 }, 

2137 ) 

2138 

2139 if self.ir_tree_nodes is not None and observe: 

2140 node = self._pop_ir_tree_node("float", kwargs, forced=forced) 

2141 if forced is None: 

2142 assert isinstance(node.value, float) 

2143 forced = node.value 

2144 fake_forced = True 

2145 

2146 value = self.provider.draw_float( 

2147 **kwargs, forced=forced, fake_forced=fake_forced 

2148 ) 

2149 if observe: 

2150 self.observer.draw_float( 

2151 value, kwargs=kwargs, was_forced=forced is not None and not fake_forced 

2152 ) 

2153 self.__example_record.record_ir_draw( 

2154 "float", 

2155 value, 

2156 kwargs=kwargs, 

2157 was_forced=forced is not None and not fake_forced, 

2158 ) 

2159 return value 

2160 

2161 def draw_string( 

2162 self, 

2163 intervals: IntervalSet, 

2164 *, 

2165 min_size: int = 0, 

2166 max_size: Optional[int] = None, 

2167 forced: Optional[str] = None, 

2168 fake_forced: bool = False, 

2169 observe: bool = True, 

2170 ) -> str: 

2171 assert forced is None or min_size <= len(forced) 

2172 

2173 kwargs: StringKWargs = self._pooled_kwargs( 

2174 "string", 

2175 { 

2176 "intervals": intervals, 

2177 "min_size": min_size, 

2178 "max_size": max_size, 

2179 }, 

2180 ) 

2181 if self.ir_tree_nodes is not None and observe: 

2182 node = self._pop_ir_tree_node("string", kwargs, forced=forced) 

2183 if forced is None: 

2184 assert isinstance(node.value, str) 

2185 forced = node.value 

2186 fake_forced = True 

2187 

2188 value = self.provider.draw_string( 

2189 **kwargs, forced=forced, fake_forced=fake_forced 

2190 ) 

2191 if observe: 

2192 self.observer.draw_string( 

2193 value, kwargs=kwargs, was_forced=forced is not None and not fake_forced 

2194 ) 

2195 self.__example_record.record_ir_draw( 

2196 "string", 

2197 value, 

2198 kwargs=kwargs, 

2199 was_forced=forced is not None and not fake_forced, 

2200 ) 

2201 return value 

2202 

2203 def draw_bytes( 

2204 self, 

2205 # TODO move to min_size and max_size here. 

2206 size: int, 

2207 *, 

2208 forced: Optional[bytes] = None, 

2209 fake_forced: bool = False, 

2210 observe: bool = True, 

2211 ) -> bytes: 

2212 assert forced is None or len(forced) == size 

2213 assert size >= 0 

2214 

2215 kwargs: BytesKWargs = self._pooled_kwargs("bytes", {"size": size}) 

2216 

2217 if self.ir_tree_nodes is not None and observe: 

2218 node = self._pop_ir_tree_node("bytes", kwargs, forced=forced) 

2219 if forced is None: 

2220 assert isinstance(node.value, bytes) 

2221 forced = node.value 

2222 fake_forced = True 

2223 

2224 value = self.provider.draw_bytes( 

2225 **kwargs, forced=forced, fake_forced=fake_forced 

2226 ) 

2227 if observe: 

2228 self.observer.draw_bytes( 

2229 value, kwargs=kwargs, was_forced=forced is not None and not fake_forced 

2230 ) 

2231 self.__example_record.record_ir_draw( 

2232 "bytes", 

2233 value, 

2234 kwargs=kwargs, 

2235 was_forced=forced is not None and not fake_forced, 

2236 ) 

2237 return value 

2238 

2239 def draw_boolean( 

2240 self, 

2241 p: float = 0.5, 

2242 *, 

2243 forced: Optional[bool] = None, 

2244 fake_forced: bool = False, 

2245 observe: bool = True, 

2246 ) -> bool: 

2247 # Internally, we treat probabilities lower than 1 / 2**64 as 

2248 # unconditionally false. 

2249 # 

2250 # Note that even if we lift this 64 bit restriction in the future, p 

2251 # cannot be 0 (1) when forced is True (False). 

2252 if forced is True: 

2253 assert p > 2 ** (-64) 

2254 if forced is False: 

2255 assert p < (1 - 2 ** (-64)) 

2256 

2257 kwargs: BooleanKWargs = self._pooled_kwargs("boolean", {"p": p}) 

2258 

2259 if self.ir_tree_nodes is not None and observe: 

2260 node = self._pop_ir_tree_node("boolean", kwargs, forced=forced) 

2261 if forced is None: 

2262 assert isinstance(node.value, bool) 

2263 forced = node.value 

2264 fake_forced = True 

2265 

2266 value = self.provider.draw_boolean( 

2267 **kwargs, forced=forced, fake_forced=fake_forced 

2268 ) 

2269 if observe: 

2270 self.observer.draw_boolean( 

2271 value, kwargs=kwargs, was_forced=forced is not None and not fake_forced 

2272 ) 

2273 self.__example_record.record_ir_draw( 

2274 "boolean", 

2275 value, 

2276 kwargs=kwargs, 

2277 was_forced=forced is not None and not fake_forced, 

2278 ) 

2279 return value 

2280 

2281 def _pooled_kwargs(self, ir_type, kwargs): 

2282 """Memoize common dictionary objects to reduce memory pressure.""" 

2283 key = [] 

2284 for k, v in kwargs.items(): 

2285 if ir_type == "float" and k in ["min_value", "max_value"]: 

2286 # handle -0.0 vs 0.0, etc. 

2287 v = float_to_int(v) 

2288 elif ir_type == "integer" and k == "weights": 

2289 # make hashable 

2290 v = v if v is None else tuple(v) 

2291 key.append((k, v)) 

2292 

2293 key = (ir_type, *sorted(key)) 

2294 

2295 try: 

2296 return POOLED_KWARGS_CACHE[key] 

2297 except KeyError: 

2298 POOLED_KWARGS_CACHE[key] = kwargs 

2299 return kwargs 

2300 

2301 def _pop_ir_tree_node( 

2302 self, ir_type: IRTypeName, kwargs: IRKWargsType, *, forced: Optional[IRType] 

2303 ) -> IRNode: 

2304 assert self.ir_tree_nodes is not None 

2305 

2306 if self._node_index == len(self.ir_tree_nodes): 

2307 self.mark_overrun() 

2308 

2309 node = self.ir_tree_nodes[self._node_index] 

2310 # If we're trying to draw a different ir type at the same location, then 

2311 # this ir tree has become badly misaligned. We don't have many good/simple 

2312 # options here for realigning beyond giving up. 

2313 # 

2314 # This is more of an issue for ir nodes while shrinking than it was for 

2315 # buffers: misaligned buffers are still usually valid, just interpreted 

2316 # differently. This would be somewhat like drawing a random value for 

2317 # the new ir type here. For what it's worth, misaligned buffers are 

2318 # rather unlikely to be *useful* buffers, so giving up isn't a big downgrade. 

2319 # (in fact, it is possible that giving up early here results in more time 

2320 # for useful shrinks to run). 

2321 if node.ir_type != ir_type: 

2322 invalid_at = (ir_type, kwargs, forced) 

2323 self.invalid_at = invalid_at 

2324 self.observer.mark_invalid(invalid_at) 

2325 self.mark_invalid(f"(internal) want a {ir_type} but have a {node.ir_type}") 

2326 

2327 # if a node has different kwargs (and so is misaligned), but has a value 

2328 # that is allowed by the expected kwargs, then we can coerce this node 

2329 # into an aligned one by using its value. It's unclear how useful this is. 

2330 if not ir_value_permitted(node.value, node.ir_type, kwargs): 

2331 invalid_at = (ir_type, kwargs, forced) 

2332 self.invalid_at = invalid_at 

2333 self.observer.mark_invalid(invalid_at) 

2334 self.mark_invalid(f"(internal) got a {ir_type} but outside the valid range") 

2335 

2336 self._node_index += 1 

2337 return node 

2338 

2339 def as_result(self) -> Union[ConjectureResult, _Overrun]: 

2340 """Convert the result of running this test into 

2341 either an Overrun object or a ConjectureResult.""" 

2342 

2343 assert self.frozen 

2344 if self.status == Status.OVERRUN: 

2345 return Overrun 

2346 if self.__result is None: 

2347 self.__result = ConjectureResult( 

2348 status=self.status, 

2349 interesting_origin=self.interesting_origin, 

2350 buffer=self.buffer, 

2351 examples=self.examples, 

2352 blocks=self.blocks, 

2353 output=self.output, 

2354 extra_information=( 

2355 self.extra_information 

2356 if self.extra_information.has_information() 

2357 else None 

2358 ), 

2359 has_discards=self.has_discards, 

2360 target_observations=self.target_observations, 

2361 tags=frozenset(self.tags), 

2362 forced_indices=frozenset(self.forced_indices), 

2363 arg_slices=self.arg_slices, 

2364 slice_comments=self.slice_comments, 

2365 invalid_at=self.invalid_at, 

2366 ) 

2367 assert self.__result is not None 

2368 self.blocks.transfer_ownership(self.__result) 

2369 return self.__result 

2370 

2371 def __assert_not_frozen(self, name: str) -> None: 

2372 if self.frozen: 

2373 raise Frozen(f"Cannot call {name} on frozen ConjectureData") 

2374 

2375 def note(self, value: Any) -> None: 

2376 self.__assert_not_frozen("note") 

2377 if not isinstance(value, str): 

2378 value = repr(value) 

2379 self.output += value 

2380 

2381 def draw( 

2382 self, 

2383 strategy: "SearchStrategy[Ex]", 

2384 label: Optional[int] = None, 

2385 observe_as: Optional[str] = None, 

2386 ) -> "Ex": 

2387 if self.is_find and not strategy.supports_find: 

2388 raise InvalidArgument( 

2389 f"Cannot use strategy {strategy!r} within a call to find " 

2390 "(presumably because it would be invalid after the call had ended)." 

2391 ) 

2392 

2393 at_top_level = self.depth == 0 

2394 start_time = None 

2395 if at_top_level: 

2396 # We start this timer early, because accessing attributes on a LazyStrategy 

2397 # can be almost arbitrarily slow. In cases like characters() and text() 

2398 # where we cache something expensive, this led to Flaky deadline errors! 

2399 # See https://github.com/HypothesisWorks/hypothesis/issues/2108 

2400 start_time = time.perf_counter() 

2401 gc_start_time = gc_cumulative_time() 

2402 

2403 strategy.validate() 

2404 

2405 if strategy.is_empty: 

2406 self.mark_invalid(f"empty strategy {self!r}") 

2407 

2408 if self.depth >= MAX_DEPTH: 

2409 self.mark_invalid("max depth exceeded") 

2410 

2411 if label is None: 

2412 assert isinstance(strategy.label, int) 

2413 label = strategy.label 

2414 self.start_example(label=label) 

2415 try: 

2416 if not at_top_level: 

2417 return strategy.do_draw(self) 

2418 assert start_time is not None 

2419 key = observe_as or f"generate:unlabeled_{len(self.draw_times)}" 

2420 try: 

2421 strategy.validate() 

2422 try: 

2423 return strategy.do_draw(self) 

2424 finally: 

2425 # Subtract the time spent in GC to avoid overcounting, as it is 

2426 # accounted for at the overall example level. 

2427 in_gctime = gc_cumulative_time() - gc_start_time 

2428 self.draw_times[key] = time.perf_counter() - start_time - in_gctime 

2429 except Exception as err: 

2430 add_note(err, f"while generating {key[9:]!r} from {strategy!r}") 

2431 raise 

2432 finally: 

2433 self.stop_example() 

2434 

2435 def start_example(self, label: int) -> None: 

2436 self.__assert_not_frozen("start_example") 

2437 self.depth += 1 

2438 # Logically it would make sense for this to just be 

2439 # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to 

2440 # be until we ran the code under tracemalloc and found a rather significant 

2441 # chunk of allocation was happening here. This was presumably due to varargs 

2442 # or the like, but we didn't investigate further given that it was easy 

2443 # to fix with this check. 

2444 if self.depth > self.max_depth: 

2445 self.max_depth = self.depth 

2446 self.__example_record.start_example(label) 

2447 self.labels_for_structure_stack.append({label}) 

2448 

2449 def stop_example(self, *, discard: bool = False) -> None: 

2450 if self.frozen: 

2451 return 

2452 if discard: 

2453 self.has_discards = True 

2454 self.depth -= 1 

2455 assert self.depth >= -1 

2456 self.__example_record.stop_example(discard=discard) 

2457 

2458 labels_for_structure = self.labels_for_structure_stack.pop() 

2459 

2460 if not discard: 

2461 if self.labels_for_structure_stack: 

2462 self.labels_for_structure_stack[-1].update(labels_for_structure) 

2463 else: 

2464 self.tags.update([structural_coverage(l) for l in labels_for_structure]) 

2465 

2466 if discard: 

2467 # Once we've discarded an example, every test case starting with 

2468 # this prefix contains discards. We prune the tree at that point so 

2469 # as to avoid future test cases bothering with this region, on the 

2470 # assumption that some example that you could have used instead 

2471 # there would *not* trigger the discard. This greatly speeds up 

2472 # test case generation in some cases, because it allows us to 

2473 # ignore large swathes of the search space that are effectively 

2474 # redundant. 

2475 # 

2476 # A scenario that can cause us problems but which we deliberately 

2477 # have decided not to support is that if there are side effects 

2478 # during data generation then you may end up with a scenario where 

2479 # every good test case generates a discard because the discarded 

2480 # section sets up important things for later. This is not terribly 

2481 # likely and all that you see in this case is some degradation in 

2482 # quality of testing, so we don't worry about it. 

2483 # 

2484 # Note that killing the branch does *not* mean we will never 

2485 # explore below this point, and in particular we may do so during 

2486 # shrinking. Any explicit request for a data object that starts 

2487 # with the branch here will work just fine, but novel prefix 

2488 # generation will avoid it, and we can use it to detect when we 

2489 # have explored the entire tree (up to redundancy). 

2490 

2491 self.observer.kill_branch() 

2492 

2493 @property 

2494 def examples(self) -> Examples: 

2495 assert self.frozen 

2496 if self.__examples is None: 

2497 self.__examples = Examples(record=self.__example_record, blocks=self.blocks) 

2498 return self.__examples 

2499 

2500 def freeze(self) -> None: 

2501 if self.frozen: 

2502 assert isinstance(self.buffer, bytes) 

2503 return 

2504 self.finish_time = time.perf_counter() 

2505 self.gc_finish_time = gc_cumulative_time() 

2506 assert len(self.buffer) == self.index 

2507 

2508 # Always finish by closing all remaining examples so that we have a 

2509 # valid tree. 

2510 while self.depth >= 0: 

2511 self.stop_example() 

2512 

2513 self.__example_record.freeze() 

2514 

2515 self.frozen = True 

2516 

2517 self.buffer = bytes(self.buffer) 

2518 

2519 # if we were invalid because of a misalignment in the tree, we don't 

2520 # want to tell the DataTree that. Doing so would lead to inconsistent behavior. 

2521 # Given an empty DataTree 

2522 # ┌──────┐ 

2523 # │ root │ 

2524 # └──────┘ 

2525 # and supposing the very first draw is misaligned, concluding here would 

2526 # tell the datatree that the *only* possibility at the root node is Status.INVALID: 

2527 # ┌──────┐ 

2528 # │ root │ 

2529 # └──┬───┘ 

2530 # ┌───────────┴───────────────┐ 

2531 # │ Conclusion(Status.INVALID)│ 

2532 # └───────────────────────────┘ 

2533 # when in fact this is only the case when we try to draw a misaligned node. 

2534 # For instance, suppose we come along in the second test case and try a 

2535 # valid node as the first draw from the root. The DataTree thinks this 

2536 # is flaky (because root must lead to Status.INVALID in the tree) while 

2537 # in fact nothing in the test function has changed and the only change 

2538 # is in the ir tree prefix we are supplying. 

2539 # 

2540 # From the perspective of DataTree, it is safe to not conclude here. This 

2541 # tells the datatree that we don't know what happens after this node - which 

2542 # is true! We are aborting early here because the ir tree became misaligned, 

2543 # which is a semantically different invalidity than an assume or filter failing. 

2544 if self.invalid_at is None: 

2545 self.observer.conclude_test(self.status, self.interesting_origin) 

2546 

2547 def choice( 

2548 self, 

2549 values: Sequence[T], 

2550 *, 

2551 forced: Optional[T] = None, 

2552 fake_forced: bool = False, 

2553 observe: bool = True, 

2554 ) -> T: 

2555 forced_i = None if forced is None else values.index(forced) 

2556 i = self.draw_integer( 

2557 0, 

2558 len(values) - 1, 

2559 forced=forced_i, 

2560 fake_forced=fake_forced, 

2561 observe=observe, 

2562 ) 

2563 return values[i] 

2564 

2565 def draw_bits( 

2566 self, n: int, *, forced: Optional[int] = None, fake_forced: bool = False 

2567 ) -> int: 

2568 """Return an ``n``-bit integer from the underlying source of 

2569 bytes. If ``forced`` is set to an integer will instead 

2570 ignore the underlying source and simulate a draw as if it had 

2571 returned that integer.""" 

2572 self.__assert_not_frozen("draw_bits") 

2573 if n == 0: 

2574 return 0 

2575 assert n > 0 

2576 n_bytes = bits_to_bytes(n) 

2577 self.__check_capacity(n_bytes) 

2578 

2579 if forced is not None: 

2580 buf = int_to_bytes(forced, n_bytes) 

2581 elif self._bytes_drawn < len(self.__prefix): 

2582 index = self._bytes_drawn 

2583 buf = self.__prefix[index : index + n_bytes] 

2584 if len(buf) < n_bytes: 

2585 assert self.__random is not None 

2586 buf += uniform(self.__random, n_bytes - len(buf)) 

2587 else: 

2588 assert self.__random is not None 

2589 buf = uniform(self.__random, n_bytes) 

2590 buf = bytearray(buf) 

2591 self._bytes_drawn += n_bytes 

2592 

2593 assert len(buf) == n_bytes 

2594 

2595 # If we have a number of bits that is not a multiple of 8 

2596 # we have to mask off the high bits. 

2597 buf[0] &= BYTE_MASKS[n % 8] 

2598 buf = bytes(buf) 

2599 result = int_from_bytes(buf) 

2600 

2601 self.__example_record.draw_bits() 

2602 

2603 initial = self.index 

2604 

2605 assert isinstance(self.buffer, bytearray) 

2606 self.buffer.extend(buf) 

2607 self.index = len(self.buffer) 

2608 

2609 if forced is not None and not fake_forced: 

2610 self.forced_indices.update(range(initial, self.index)) 

2611 

2612 self.blocks.add_endpoint(self.index) 

2613 

2614 assert result.bit_length() <= n 

2615 return result 

2616 

2617 def __check_capacity(self, n: int) -> None: 

2618 if self.index + n > self.max_length: 

2619 self.mark_overrun() 

2620 

2621 def conclude_test( 

2622 self, 

2623 status: Status, 

2624 interesting_origin: Optional[InterestingOrigin] = None, 

2625 ) -> NoReturn: 

2626 assert (interesting_origin is None) or (status == Status.INTERESTING) 

2627 self.__assert_not_frozen("conclude_test") 

2628 self.interesting_origin = interesting_origin 

2629 self.status = status 

2630 self.freeze() 

2631 raise StopTest(self.testcounter) 

2632 

2633 def mark_interesting( 

2634 self, interesting_origin: Optional[InterestingOrigin] = None 

2635 ) -> NoReturn: 

2636 self.conclude_test(Status.INTERESTING, interesting_origin) 

2637 

2638 def mark_invalid(self, why: Optional[str] = None) -> NoReturn: 

2639 if why is not None: 

2640 self.events["invalid because"] = why 

2641 self.conclude_test(Status.INVALID) 

2642 

2643 def mark_overrun(self) -> NoReturn: 

2644 self.conclude_test(Status.OVERRUN) 

2645 

2646 

2647def bits_to_bytes(n: int) -> int: 

2648 """The number of bytes required to represent an n-bit number. 

2649 Equivalent to (n + 7) // 8, but slightly faster. This really is 

2650 called enough times that that matters.""" 

2651 return (n + 7) >> 3