Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/shrinker.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

614 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import math 

12from collections import defaultdict 

13from collections.abc import Callable, Sequence 

14from dataclasses import dataclass 

15from typing import ( 

16 TYPE_CHECKING, 

17 Any, 

18 Literal, 

19 TypeAlias, 

20 cast, 

21) 

22 

23from hypothesis.internal.conjecture.choice import ( 

24 ChoiceNode, 

25 ChoiceT, 

26 choice_equal, 

27 choice_from_index, 

28 choice_key, 

29 choice_permitted, 

30 choice_to_index, 

31) 

32from hypothesis.internal.conjecture.data import ( 

33 ConjectureData, 

34 ConjectureResult, 

35 Spans, 

36 Status, 

37 _Overrun, 

38 draw_choice, 

39) 

40from hypothesis.internal.conjecture.junkdrawer import ( 

41 endswith, 

42 find_integer, 

43 replace_all, 

44 startswith, 

45) 

46from hypothesis.internal.conjecture.shrinking import ( 

47 Bytes, 

48 Float, 

49 Integer, 

50 Ordering, 

51 String, 

52) 

53from hypothesis.internal.conjecture.shrinking.choicetree import ( 

54 ChoiceTree, 

55 prefix_selection_order, 

56 random_selection_order, 

57) 

58from hypothesis.internal.floats import MAX_PRECISE_INTEGER 

59 

60if TYPE_CHECKING: 

61 from random import Random 

62 

63 from hypothesis.internal.conjecture.engine import ConjectureRunner 

64 

65ShrinkPredicateT: TypeAlias = Callable[[ConjectureResult | _Overrun], bool] 

66 

67 

68def sort_key(nodes: Sequence[ChoiceNode]) -> tuple[int, tuple[int, ...]]: 

69 """Returns a sort key such that "simpler" choice sequences are smaller than 

70 "more complicated" ones. 

71 

72 We define sort_key so that x is simpler than y if x is shorter than y or if 

73 they have the same length and map(choice_to_index, x) < map(choice_to_index, y). 

74 

75 The reason for using this ordering is: 

76 

77 1. If x is shorter than y then that means we had to make fewer decisions 

78 in constructing the test case when we ran x than we did when we ran y. 

79 2. If x is the same length as y then replacing a choice with a lower index 

80 choice corresponds to replacing it with a simpler/smaller choice. 

81 3. Because choices drawn early in generation potentially get used in more 

82 places they potentially have a more significant impact on the final 

83 result, so it makes sense to prioritise reducing earlier choices over 

84 later ones. 

85 """ 

86 return ( 

87 len(nodes), 

88 tuple(choice_to_index(node.value, node.constraints) for node in nodes), 

89 ) 

90 

91 

92@dataclass(slots=True, frozen=False) 

93class ShrinkPass: 

94 function: Any 

95 name: str | None = None 

96 last_prefix: Any = () 

97 

98 # some execution statistics 

99 calls: int = 0 

100 misaligned: int = 0 

101 shrinks: int = 0 

102 deletions: int = 0 

103 

104 def __post_init__(self): 

105 if self.name is None: 

106 self.name = self.function.__name__ 

107 

108 def __hash__(self): 

109 return hash(self.name) 

110 

111 

112class StopShrinking(Exception): 

113 pass 

114 

115 

116class Shrinker: 

117 """A shrinker is a child object of a ConjectureRunner which is designed to 

118 manage the associated state of a particular shrink problem. That is, we 

119 have some initial ConjectureData object and some property of interest 

120 that it satisfies, and we want to find a ConjectureData object with a 

121 shortlex (see sort_key above) smaller choice sequence that exhibits the same 

122 property. 

123 

124 Currently the only property of interest we use is that the status is 

125 INTERESTING and the interesting_origin takes on some fixed value, but we 

126 may potentially be interested in other use cases later. 

127 However we assume that data with a status < VALID never satisfies the predicate. 

128 

129 The shrinker keeps track of a value shrink_target which represents the 

130 current best known ConjectureData object satisfying the predicate. 

131 It refines this value by repeatedly running *shrink passes*, which are 

132 methods that perform a series of transformations to the current shrink_target 

133 and evaluate the underlying test function to find new ConjectureData 

134 objects. If any of these satisfy the predicate, the shrink_target 

135 is updated automatically. Shrinking runs until no shrink pass can 

136 improve the shrink_target, at which point it stops. It may also be 

137 terminated if the underlying engine throws RunIsComplete, but that 

138 is handled by the calling code rather than the Shrinker. 

139 

140 ======================= 

141 Designing Shrink Passes 

142 ======================= 

143 

144 Generally a shrink pass is just any function that calls 

145 cached_test_function and/or consider_new_nodes a number of times, 

146 but there are a couple of useful things to bear in mind. 

147 

148 A shrink pass *makes progress* if running it changes self.shrink_target 

149 (i.e. it tries a shortlex smaller ConjectureData object satisfying 

150 the predicate). The desired end state of shrinking is to find a 

151 value such that no shrink pass can make progress, i.e. that we 

152 are at a local minimum for each shrink pass. 

153 

154 In aid of this goal, the main invariant that a shrink pass much 

155 satisfy is that whether it makes progress must be deterministic. 

156 It is fine (encouraged even) for the specific progress it makes 

157 to be non-deterministic, but if you run a shrink pass, it makes 

158 no progress, and then you immediately run it again, it should 

159 never succeed on the second time. This allows us to stop as soon 

160 as we have run each shrink pass and seen no progress on any of 

161 them. 

162 

163 This means that e.g. it's fine to try each of N deletions 

164 or replacements in a random order, but it's not OK to try N random 

165 deletions (unless you have already shrunk at least once, though we 

166 don't currently take advantage of this loophole). 

167 

168 Shrink passes need to be written so as to be robust against 

169 change in the underlying shrink target. It is generally safe 

170 to assume that the shrink target does not change prior to the 

171 point of first modification - e.g. if you change no bytes at 

172 index ``i``, all spans whose start is ``<= i`` still exist, 

173 as do all blocks, and the data object is still of length 

174 ``>= i + 1``. This can only be violated by bad user code which 

175 relies on an external source of non-determinism. 

176 

177 When the underlying shrink_target changes, shrink 

178 passes should not run substantially more test_function calls 

179 on success than they do on failure. Say, no more than a constant 

180 factor more. In particular shrink passes should not iterate to a 

181 fixed point. 

182 

183 This means that shrink passes are often written with loops that 

184 are carefully designed to do the right thing in the case that no 

185 shrinks occurred and try to adapt to any changes to do a reasonable 

186 job. e.g. say we wanted to write a shrink pass that tried deleting 

187 each individual choice (this isn't an especially good pass, 

188 but it leads to a simple illustrative example), we might do it 

189 by iterating over the choice sequence like so: 

190 

191 .. code-block:: python 

192 

193 i = 0 

194 while i < len(self.shrink_target.nodes): 

195 if not self.consider_new_nodes( 

196 self.shrink_target.nodes[:i] + self.shrink_target.nodes[i + 1 :] 

197 ): 

198 i += 1 

199 

200 The reason for writing the loop this way is that i is always a 

201 valid index into the current choice sequence, even if the current sequence 

202 changes as a result of our actions. When the choice sequence changes, 

203 we leave the index where it is rather than restarting from the 

204 beginning, and carry on. This means that the number of steps we 

205 run in this case is always bounded above by the number of steps 

206 we would run if nothing works. 

207 

208 Another thing to bear in mind about shrink pass design is that 

209 they should prioritise *progress*. If you have N operations that 

210 you need to run, you should try to order them in such a way as 

211 to avoid stalling, where you have long periods of test function 

212 invocations where no shrinks happen. This is bad because whenever 

213 we shrink we reduce the amount of work the shrinker has to do 

214 in future, and often speed up the test function, so we ideally 

215 wanted those shrinks to happen much earlier in the process. 

216 

217 Sometimes stalls are inevitable of course - e.g. if the pass 

218 makes no progress, then the entire thing is just one long stall, 

219 but it's helpful to design it so that stalls are less likely 

220 in typical behaviour. 

221 

222 The two easiest ways to do this are: 

223 

224 * Just run the N steps in random order. As long as a 

225 reasonably large proportion of the operations succeed, this 

226 guarantees the expected stall length is quite short. The 

227 book keeping for making sure this does the right thing when 

228 it succeeds can be quite annoying. 

229 * When you have any sort of nested loop, loop in such a way 

230 that both loop variables change each time. This prevents 

231 stalls which occur when one particular value for the outer 

232 loop is impossible to make progress on, rendering the entire 

233 inner loop into a stall. 

234 

235 However, although progress is good, too much progress can be 

236 a bad sign! If you're *only* seeing successful reductions, 

237 that's probably a sign that you are making changes that are 

238 too timid. Two useful things to offset this: 

239 

240 * It's worth writing shrink passes which are *adaptive*, in 

241 the sense that when operations seem to be working really 

242 well we try to bundle multiple of them together. This can 

243 often be used to turn what would be O(m) successful calls 

244 into O(log(m)). 

245 * It's often worth trying one or two special minimal values 

246 before trying anything more fine grained (e.g. replacing 

247 the whole thing with zero). 

248 

249 """ 

250 

251 def derived_value(fn): 

252 """It's useful during shrinking to have access to derived values of 

253 the current shrink target. 

254 

255 This decorator allows you to define these as cached properties. They 

256 are calculated once, then cached until the shrink target changes, then 

257 recalculated the next time they are used.""" 

258 

259 def accept(self): 

260 try: 

261 return self.__derived_values[fn.__name__] 

262 except KeyError: 

263 return self.__derived_values.setdefault(fn.__name__, fn(self)) 

264 

265 accept.__name__ = fn.__name__ 

266 return property(accept) 

267 

268 def __init__( 

269 self, 

270 engine: "ConjectureRunner", 

271 initial: ConjectureData | ConjectureResult, 

272 predicate: ShrinkPredicateT | None, 

273 *, 

274 allow_transition: ( 

275 Callable[[ConjectureData | ConjectureResult, ConjectureData], bool] | None 

276 ), 

277 explain: bool, 

278 in_target_phase: bool = False, 

279 ): 

280 """Create a shrinker for a particular engine, with a given starting 

281 point and predicate. When shrink() is called it will attempt to find an 

282 example for which predicate is True and which is strictly smaller than 

283 initial. 

284 

285 Note that initial is a ConjectureData object, and predicate 

286 takes ConjectureData objects. 

287 """ 

288 assert predicate is not None or allow_transition is not None 

289 self.engine = engine 

290 self.__predicate = predicate or (lambda data: True) 

291 self.__allow_transition = allow_transition or (lambda source, destination: True) 

292 self.__derived_values: dict = {} 

293 

294 self.initial_size = len(initial.choices) 

295 # We keep track of the current best example on the shrink_target 

296 # attribute. 

297 self.shrink_target = initial 

298 self.clear_change_tracking() 

299 self.shrinks = 0 

300 

301 # We terminate shrinks that seem to have reached their logical 

302 # conclusion: If we've called the underlying test function at 

303 # least self.max_stall times since the last time we shrunk, 

304 # it's time to stop shrinking. 

305 self.max_stall = 200 

306 self.initial_calls = self.engine.call_count 

307 self.initial_misaligned = self.engine.misaligned_count 

308 self.calls_at_last_shrink = self.initial_calls 

309 

310 self.shrink_passes: list[ShrinkPass] = [ 

311 ShrinkPass(self.try_trivial_spans), 

312 self.node_program("X" * 5), 

313 self.node_program("X" * 4), 

314 self.node_program("X" * 3), 

315 self.node_program("X" * 2), 

316 self.node_program("X" * 1), 

317 ShrinkPass(self.pass_to_descendant), 

318 ShrinkPass(self.reorder_spans), 

319 ShrinkPass(self.minimize_duplicated_choices), 

320 ShrinkPass(self.minimize_individual_choices), 

321 ShrinkPass(self.redistribute_numeric_pairs), 

322 ShrinkPass(self.lower_integers_together), 

323 ShrinkPass(self.lower_duplicated_characters), 

324 ] 

325 

326 # Because the shrinker is also used to `pareto_optimise` in the target phase, 

327 # we sometimes want to allow extending buffers instead of aborting at the end. 

328 self.__extend: Literal["full"] | int = "full" if in_target_phase else 0 

329 self.should_explain = explain 

330 

331 @derived_value # type: ignore 

332 def cached_calculations(self): 

333 return {} 

334 

335 def cached(self, *keys): 

336 def accept(f): 

337 cache_key = (f.__name__, *keys) 

338 try: 

339 return self.cached_calculations[cache_key] 

340 except KeyError: 

341 return self.cached_calculations.setdefault(cache_key, f()) 

342 

343 return accept 

344 

345 @property 

346 def calls(self) -> int: 

347 """Return the number of calls that have been made to the underlying 

348 test function.""" 

349 return self.engine.call_count 

350 

351 @property 

352 def misaligned(self) -> int: 

353 return self.engine.misaligned_count 

354 

355 def check_calls(self) -> None: 

356 if self.calls - self.calls_at_last_shrink >= self.max_stall: 

357 raise StopShrinking 

358 

359 def cached_test_function( 

360 self, nodes: Sequence[ChoiceNode] 

361 ) -> tuple[bool, ConjectureResult | _Overrun | None]: 

362 nodes = nodes[: len(self.nodes)] 

363 

364 if startswith(nodes, self.nodes): 

365 return (True, None) 

366 

367 if sort_key(self.nodes) < sort_key(nodes): 

368 return (False, None) 

369 

370 # sometimes our shrinking passes try obviously invalid things. We handle 

371 # discarding them in one place here. 

372 if any(not choice_permitted(node.value, node.constraints) for node in nodes): 

373 return (False, None) 

374 

375 result = self.engine.cached_test_function( 

376 [n.value for n in nodes], extend=self.__extend 

377 ) 

378 previous = self.shrink_target 

379 self.incorporate_test_data(result) 

380 self.check_calls() 

381 return (previous is not self.shrink_target, result) 

382 

383 def consider_new_nodes(self, nodes: Sequence[ChoiceNode]) -> bool: 

384 return self.cached_test_function(nodes)[0] 

385 

386 def incorporate_test_data(self, data): 

387 """Takes a ConjectureData or Overrun object updates the current 

388 shrink_target if this data represents an improvement over it.""" 

389 if data.status < Status.VALID or data is self.shrink_target: 

390 return 

391 if ( 

392 self.__predicate(data) 

393 and sort_key(data.nodes) < sort_key(self.shrink_target.nodes) 

394 and self.__allow_transition(self.shrink_target, data) 

395 ): 

396 self.update_shrink_target(data) 

397 

398 def debug(self, msg: str) -> None: 

399 self.engine.debug(msg) 

400 

401 @property 

402 def random(self) -> "Random": 

403 return self.engine.random 

404 

405 def shrink(self) -> None: 

406 """Run the full set of shrinks and update shrink_target. 

407 

408 This method is "mostly idempotent" - calling it twice is unlikely to 

409 have any effect, though it has a non-zero probability of doing so. 

410 """ 

411 

412 try: 

413 self.initial_coarse_reduction() 

414 self.greedy_shrink() 

415 except StopShrinking: 

416 # If we stopped shrinking because we're making slow progress (instead of 

417 # reaching a local optimum), don't run the explain-phase logic. 

418 self.should_explain = False 

419 finally: 

420 if self.engine.report_debug_info: 

421 

422 def s(n): 

423 return "s" if n != 1 else "" 

424 

425 total_deleted = self.initial_size - len(self.shrink_target.choices) 

426 calls = self.engine.call_count - self.initial_calls 

427 misaligned = self.engine.misaligned_count - self.initial_misaligned 

428 

429 self.debug( 

430 "---------------------\n" 

431 "Shrink pass profiling\n" 

432 "---------------------\n\n" 

433 f"Shrinking made a total of {calls} call{s(calls)} of which " 

434 f"{self.shrinks} shrank and {misaligned} were misaligned. This " 

435 f"deleted {total_deleted} choices out of {self.initial_size}." 

436 ) 

437 for useful in [True, False]: 

438 self.debug("") 

439 if useful: 

440 self.debug("Useful passes:") 

441 else: 

442 self.debug("Useless passes:") 

443 self.debug("") 

444 for pass_ in sorted( 

445 self.shrink_passes, 

446 key=lambda t: (-t.calls, t.deletions, t.shrinks), 

447 ): 

448 if pass_.calls == 0: 

449 continue 

450 if (pass_.shrinks != 0) != useful: 

451 continue 

452 

453 self.debug( 

454 f" * {pass_.name} made {pass_.calls} call{s(pass_.calls)} of which " 

455 f"{pass_.shrinks} shrank and {pass_.misaligned} were misaligned, " 

456 f"deleting {pass_.deletions} choice{s(pass_.deletions)}." 

457 ) 

458 self.debug("") 

459 self.explain() 

460 

461 def explain(self) -> None: 

462 

463 if not self.should_explain or not self.shrink_target.arg_slices: 

464 return 

465 

466 self.max_stall = 2**100 

467 shrink_target = self.shrink_target 

468 nodes = self.nodes 

469 choices = self.choices 

470 chunks: dict[tuple[int, int], list[tuple[ChoiceT, ...]]] = defaultdict(list) 

471 

472 # Before we start running experiments, let's check for known inputs which would 

473 # make them redundant. The shrinking process means that we've already tried many 

474 # variations on the minimal example, so this can save a lot of time. 

475 seen_passing_seq = self.engine.passing_choice_sequences( 

476 prefix=self.nodes[: min(self.shrink_target.arg_slices)[0]] 

477 ) 

478 

479 # Now that we've shrunk to a minimal failing example, it's time to try 

480 # varying each part that we've noted will go in the final report. Consider 

481 # slices in largest-first order 

482 for start, end in sorted( 

483 self.shrink_target.arg_slices, key=lambda x: (-(x[1] - x[0]), x) 

484 ): 

485 # Check for any previous examples that match the prefix and suffix, 

486 # so we can skip if we found a passing example while shrinking. 

487 if any( 

488 startswith(seen, nodes[:start]) and endswith(seen, nodes[end:]) 

489 for seen in seen_passing_seq 

490 ): 

491 continue 

492 

493 # Skip slices that are subsets of already-explained slices. 

494 # If a larger slice can vary freely, so can its sub-slices. 

495 # Note: (0, 0) is a special marker for the "together" comment that 

496 # applies to the whole test, not a specific slice, so we exclude it. 

497 if any( 

498 s <= start and end <= e 

499 for s, e in self.shrink_target.slice_comments 

500 if (s, e) != (0, 0) 

501 ): 

502 continue 

503 

504 # Run our experiments 

505 n_same_failures = 0 

506 note = "or any other generated value" 

507 # TODO: is 100 same-failures out of 500 attempts a good heuristic? 

508 for n_attempt in range(500): # pragma: no branch 

509 # no-branch here because we don't coverage-test the abort-at-500 logic. 

510 

511 if n_attempt - 10 > n_same_failures * 5: 

512 # stop early if we're seeing mostly invalid examples 

513 break # pragma: no cover 

514 

515 # replace start:end with random values 

516 replacement = [] 

517 for i in range(start, end): 

518 node = nodes[i] 

519 if not node.was_forced: 

520 value = draw_choice( 

521 node.type, node.constraints, random=self.random 

522 ) 

523 node = node.copy(with_value=value) 

524 replacement.append(node.value) 

525 

526 attempt = choices[:start] + tuple(replacement) + choices[end:] 

527 result = self.engine.cached_test_function(attempt, extend="full") 

528 

529 if result.status is Status.OVERRUN: 

530 continue # pragma: no cover # flakily covered 

531 result = cast(ConjectureResult, result) 

532 if not ( 

533 len(attempt) == len(result.choices) 

534 and endswith(result.nodes, nodes[end:]) 

535 ): 

536 # Turns out this was a variable-length part, so grab the infix... 

537 for span1, span2 in zip( 

538 shrink_target.spans, result.spans, strict=False 

539 ): 

540 assert span1.start == span2.start 

541 assert span1.start <= start 

542 assert span1.label == span2.label 

543 if span1.start == start and span1.end == end: 

544 result_end = span2.end 

545 break 

546 else: 

547 raise NotImplementedError("Expected matching prefixes") 

548 

549 attempt = ( 

550 choices[:start] 

551 + result.choices[start:result_end] 

552 + choices[end:] 

553 ) 

554 chunks[(start, end)].append(result.choices[start:result_end]) 

555 result = self.engine.cached_test_function(attempt) 

556 

557 if result.status is Status.OVERRUN: 

558 continue # pragma: no cover # flakily covered 

559 result = cast(ConjectureResult, result) 

560 else: 

561 chunks[(start, end)].append(result.choices[start:end]) 

562 

563 if shrink_target is not self.shrink_target: # pragma: no cover 

564 # If we've shrunk further without meaning to, bail out. 

565 self.shrink_target.slice_comments.clear() 

566 return 

567 if result.status is Status.VALID: 

568 # The test passed, indicating that this param can't vary freely. 

569 # However, it's really hard to write a simple and reliable covering 

570 # test, because of our `seen_passing_buffers` check above. 

571 break # pragma: no cover 

572 if self.__predicate(result): # pragma: no branch 

573 n_same_failures += 1 

574 if n_same_failures >= 100: 

575 self.shrink_target.slice_comments[(start, end)] = note 

576 break 

577 

578 # Finally, if we've found multiple independently-variable parts, check whether 

579 # they can all be varied together. 

580 if len(self.shrink_target.slice_comments) <= 1: 

581 return 

582 n_same_failures_together = 0 

583 # Only include slices that were actually added to slice_comments 

584 chunks_by_start_index = sorted( 

585 (k, v) for k, v in chunks.items() if k in self.shrink_target.slice_comments 

586 ) 

587 for _ in range(500): # pragma: no branch 

588 # no-branch here because we don't coverage-test the abort-at-500 logic. 

589 new_choices: list[ChoiceT] = [] 

590 prev_end = 0 

591 for (start, end), ls in chunks_by_start_index: 

592 assert prev_end <= start < end, "these chunks must be nonoverlapping" 

593 new_choices.extend(choices[prev_end:start]) 

594 new_choices.extend(self.random.choice(ls)) 

595 prev_end = end 

596 

597 result = self.engine.cached_test_function(new_choices) 

598 

599 # This *can't* be a shrink because none of the components were. 

600 assert shrink_target is self.shrink_target 

601 if result.status == Status.VALID: 

602 self.shrink_target.slice_comments[(0, 0)] = ( 

603 "The test sometimes passed when commented parts were varied together." 

604 ) 

605 break # Test passed, this param can't vary freely. 

606 if self.__predicate(result): # pragma: no branch 

607 n_same_failures_together += 1 

608 if n_same_failures_together >= 100: 

609 self.shrink_target.slice_comments[(0, 0)] = ( 

610 "The test always failed when commented parts were varied together." 

611 ) 

612 break 

613 

614 def greedy_shrink(self) -> None: 

615 """Run a full set of greedy shrinks (that is, ones that will only ever 

616 move to a better target) and update shrink_target appropriately. 

617 

618 This method iterates to a fixed point and so is idempontent - calling 

619 it twice will have exactly the same effect as calling it once. 

620 """ 

621 self.fixate_shrink_passes(self.shrink_passes) 

622 

623 def initial_coarse_reduction(self): 

624 """Performs some preliminary reductions that should not be 

625 repeated as part of the main shrink passes. 

626 

627 The main reason why these can't be included as part of shrink 

628 passes is that they have much more ability to make the test 

629 case "worse". e.g. they might rerandomise part of it, significantly 

630 increasing the value of individual nodes, which works in direct 

631 opposition to the lexical shrinking and will frequently undo 

632 its work. 

633 """ 

634 self.reduce_each_alternative() 

635 

636 @derived_value # type: ignore 

637 def spans_starting_at(self): 

638 result = [[] for _ in self.shrink_target.nodes] 

639 for i, ex in enumerate(self.spans): 

640 # We can have zero-length spans that start at the end 

641 if ex.start < len(result): 

642 result[ex.start].append(i) 

643 return tuple(map(tuple, result)) 

644 

645 def reduce_each_alternative(self): 

646 """This is a pass that is designed to rerandomise use of the 

647 one_of strategy or things that look like it, in order to try 

648 to move from later strategies to earlier ones in the branch 

649 order. 

650 

651 It does this by trying to systematically lower each value it 

652 finds that looks like it might be the branch decision for 

653 one_of, and then attempts to repair any changes in shape that 

654 this causes. 

655 """ 

656 i = 0 

657 while i < len(self.shrink_target.nodes): 

658 nodes = self.shrink_target.nodes 

659 node = nodes[i] 

660 if ( 

661 node.type == "integer" 

662 and not node.was_forced 

663 and node.value <= 10 

664 and node.constraints["min_value"] == 0 

665 ): 

666 assert isinstance(node.value, int) 

667 

668 # We've found a plausible candidate for a ``one_of`` choice. 

669 # We now want to see if the shape of the test case actually depends 

670 # on it. If it doesn't, then we don't need to do this (comparatively 

671 # costly) pass, and can let much simpler lexicographic reduction 

672 # handle it later. 

673 # 

674 # We test this by trying to set the value to zero and seeing if the 

675 # shape changes, as measured by either changing the number of subsequent 

676 # nodes, or changing the nodes in such a way as to cause one of the 

677 # previous values to no longer be valid in its position. 

678 zero_attempt = self.cached_test_function( 

679 nodes[:i] + (nodes[i].copy(with_value=0),) + nodes[i + 1 :] 

680 )[1] 

681 if ( 

682 zero_attempt is not self.shrink_target 

683 and zero_attempt is not None 

684 and zero_attempt.status >= Status.VALID 

685 ): 

686 changed_shape = len(zero_attempt.nodes) != len(nodes) 

687 

688 if not changed_shape: 

689 for j in range(i + 1, len(nodes)): 

690 zero_node = zero_attempt.nodes[j] 

691 orig_node = nodes[j] 

692 if ( 

693 zero_node.type != orig_node.type 

694 or not choice_permitted( 

695 orig_node.value, zero_node.constraints 

696 ) 

697 ): 

698 changed_shape = True 

699 break 

700 if changed_shape: 

701 for v in range(node.value): 

702 if self.try_lower_node_as_alternative(i, v): 

703 break 

704 i += 1 

705 

706 def try_lower_node_as_alternative(self, i, v): 

707 """Attempt to lower `self.shrink_target.nodes[i]` to `v`, 

708 while rerandomising and attempting to repair any subsequent 

709 changes to the shape of the test case that this causes.""" 

710 nodes = self.shrink_target.nodes 

711 if self.consider_new_nodes( 

712 nodes[:i] + (nodes[i].copy(with_value=v),) + nodes[i + 1 :] 

713 ): 

714 return True 

715 

716 prefix = nodes[:i] + (nodes[i].copy(with_value=v),) 

717 initial = self.shrink_target 

718 spans = self.spans_starting_at[i] 

719 for _ in range(3): 

720 random_attempt = self.engine.cached_test_function( 

721 [n.value for n in prefix], extend=len(nodes) 

722 ) 

723 if random_attempt.status < Status.VALID: 

724 continue 

725 self.incorporate_test_data(random_attempt) 

726 for j in spans: 

727 initial_span = initial.spans[j] 

728 attempt_span = random_attempt.spans[j] 

729 contents = random_attempt.nodes[attempt_span.start : attempt_span.end] 

730 self.consider_new_nodes( 

731 nodes[:i] + contents + nodes[initial_span.end :] 

732 ) 

733 if initial is not self.shrink_target: 

734 return True 

735 return False 

736 

737 @derived_value # type: ignore 

738 def shrink_pass_choice_trees(self) -> dict[Any, ChoiceTree]: 

739 return defaultdict(ChoiceTree) 

740 

741 def step(self, shrink_pass: ShrinkPass, *, random_order: bool = False) -> bool: 

742 tree = self.shrink_pass_choice_trees[shrink_pass] 

743 if tree.exhausted: 

744 return False 

745 

746 initial_shrinks = self.shrinks 

747 initial_calls = self.calls 

748 initial_misaligned = self.misaligned 

749 size = len(self.shrink_target.choices) 

750 assert shrink_pass.name is not None 

751 self.engine.explain_next_call_as(shrink_pass.name) 

752 

753 if random_order: 

754 selection_order = random_selection_order(self.random) 

755 else: 

756 selection_order = prefix_selection_order(shrink_pass.last_prefix) 

757 

758 try: 

759 shrink_pass.last_prefix = tree.step( 

760 selection_order, 

761 lambda chooser: shrink_pass.function(chooser), 

762 ) 

763 finally: 

764 shrink_pass.calls += self.calls - initial_calls 

765 shrink_pass.misaligned += self.misaligned - initial_misaligned 

766 shrink_pass.shrinks += self.shrinks - initial_shrinks 

767 shrink_pass.deletions += size - len(self.shrink_target.choices) 

768 self.engine.clear_call_explanation() 

769 return True 

770 

771 def fixate_shrink_passes(self, passes: list[ShrinkPass]) -> None: 

772 """Run steps from each pass in ``passes`` until the current shrink target 

773 is a fixed point of all of them.""" 

774 any_ran = True 

775 while any_ran: 

776 any_ran = False 

777 

778 reordering = {} 

779 

780 # We run remove_discarded after every pass to do cleanup 

781 # keeping track of whether that actually works. Either there is 

782 # no discarded data and it is basically free, or it reliably works 

783 # and deletes data, or it doesn't work. In that latter case we turn 

784 # it off for the rest of this loop through the passes, but will 

785 # try again once all of the passes have been run. 

786 can_discard = self.remove_discarded() 

787 

788 calls_at_loop_start = self.calls 

789 

790 # We keep track of how many calls can be made by a single step 

791 # without making progress and use this to test how much to pad 

792 # out self.max_stall by as we go along. 

793 max_calls_per_failing_step = 1 

794 

795 for sp in passes: 

796 if can_discard: 

797 can_discard = self.remove_discarded() 

798 

799 before_sp = self.shrink_target 

800 

801 # Run the shrink pass until it fails to make any progress 

802 # max_failures times in a row. This implicitly boosts shrink 

803 # passes that are more likely to work. 

804 failures = 0 

805 max_failures = 20 

806 while failures < max_failures: 

807 # We don't allow more than max_stall consecutive failures 

808 # to shrink, but this means that if we're unlucky and the 

809 # shrink passes are in a bad order where only the ones at 

810 # the end are useful, if we're not careful this heuristic 

811 # might stop us before we've tried everything. In order to 

812 # avoid that happening, we make sure that there's always 

813 # plenty of breathing room to make it through a single 

814 # iteration of the fixate_shrink_passes loop. 

815 self.max_stall = max( 

816 self.max_stall, 

817 2 * max_calls_per_failing_step 

818 + (self.calls - calls_at_loop_start), 

819 ) 

820 

821 prev = self.shrink_target 

822 initial_calls = self.calls 

823 # It's better for us to run shrink passes in a deterministic 

824 # order, to avoid repeat work, but this can cause us to create 

825 # long stalls when there are a lot of steps which fail to do 

826 # anything useful. In order to avoid this, once we've noticed 

827 # we're in a stall (i.e. half of max_failures calls have failed 

828 # to do anything) we switch to randomly jumping around. If we 

829 # find a success then we'll resume deterministic order from 

830 # there which, with any luck, is in a new good region. 

831 if not self.step(sp, random_order=failures >= max_failures // 2): 

832 # step returns False when there is nothing to do because 

833 # the entire choice tree is exhausted. If this happens 

834 # we break because we literally can't run this pass any 

835 # more than we already have until something else makes 

836 # progress. 

837 break 

838 any_ran = True 

839 

840 # Don't count steps that didn't actually try to do 

841 # anything as failures. Otherwise, this call is a failure 

842 # if it failed to make any changes to the shrink target. 

843 if initial_calls != self.calls: 

844 if prev is not self.shrink_target: 

845 failures = 0 

846 else: 

847 max_calls_per_failing_step = max( 

848 max_calls_per_failing_step, self.calls - initial_calls 

849 ) 

850 failures += 1 

851 

852 # We reorder the shrink passes so that on our next run through 

853 # we try good ones first. The rule is that shrink passes that 

854 # did nothing useful are the worst, shrink passes that reduced 

855 # the length are the best. 

856 if self.shrink_target is before_sp: 

857 reordering[sp] = 1 

858 elif len(self.choices) < len(before_sp.choices): 

859 reordering[sp] = -1 

860 else: 

861 reordering[sp] = 0 

862 

863 passes.sort(key=reordering.__getitem__) 

864 

865 @property 

866 def nodes(self) -> tuple[ChoiceNode, ...]: 

867 return self.shrink_target.nodes 

868 

869 @property 

870 def choices(self) -> tuple[ChoiceT, ...]: 

871 return self.shrink_target.choices 

872 

873 @property 

874 def spans(self) -> Spans: 

875 return self.shrink_target.spans 

876 

877 @derived_value # type: ignore 

878 def spans_by_label(self): 

879 """ 

880 A mapping of labels to a list of spans with that label. Spans in the list 

881 are ordered by their normal index order. 

882 """ 

883 

884 spans_by_label = defaultdict(list) 

885 for ex in self.spans: 

886 spans_by_label[ex.label].append(ex) 

887 return dict(spans_by_label) 

888 

889 @derived_value # type: ignore 

890 def distinct_labels(self): 

891 return sorted(self.spans_by_label, key=str) 

892 

893 def pass_to_descendant(self, chooser): 

894 """Attempt to replace each span with a descendant span. 

895 

896 This is designed to deal with strategies that call themselves 

897 recursively. For example, suppose we had: 

898 

899 binary_tree = st.deferred( 

900 lambda: st.one_of( 

901 st.integers(), st.tuples(binary_tree, binary_tree))) 

902 

903 This pass guarantees that we can replace any binary tree with one of 

904 its subtrees - each of those will create an interval that the parent 

905 could validly be replaced with, and this pass will try doing that. 

906 

907 This is pretty expensive - it takes O(len(intervals)^2) - so we run it 

908 late in the process when we've got the number of intervals as far down 

909 as possible. 

910 """ 

911 

912 label = chooser.choose( 

913 self.distinct_labels, lambda l: len(self.spans_by_label[l]) >= 2 

914 ) 

915 

916 spans = self.spans_by_label[label] 

917 i = chooser.choose(range(len(spans) - 1)) 

918 ancestor = spans[i] 

919 

920 if i + 1 == len(spans) or spans[i + 1].start >= ancestor.end: 

921 return 

922 

923 @self.cached(label, i) 

924 def descendants(): 

925 lo = i + 1 

926 hi = len(spans) 

927 while lo + 1 < hi: 

928 mid = (lo + hi) // 2 

929 if spans[mid].start >= ancestor.end: 

930 hi = mid 

931 else: 

932 lo = mid 

933 return [ 

934 span 

935 for span in spans[i + 1 : hi] 

936 if span.choice_count < ancestor.choice_count 

937 ] 

938 

939 descendant = chooser.choose(descendants, lambda ex: ex.choice_count > 0) 

940 

941 assert ancestor.start <= descendant.start 

942 assert ancestor.end >= descendant.end 

943 assert descendant.choice_count < ancestor.choice_count 

944 

945 self.consider_new_nodes( 

946 self.nodes[: ancestor.start] 

947 + self.nodes[descendant.start : descendant.end] 

948 + self.nodes[ancestor.end :] 

949 ) 

950 

951 def lower_common_node_offset(self): 

952 """Sometimes we find ourselves in a situation where changes to one part 

953 of the choice sequence unlock changes to other parts. Sometimes this is 

954 good, but sometimes this can cause us to exhibit exponential slow 

955 downs! 

956 

957 e.g. suppose we had the following: 

958 

959 m = draw(integers(min_value=0)) 

960 n = draw(integers(min_value=0)) 

961 assert abs(m - n) > 1 

962 

963 If this fails then we'll end up with a loop where on each iteration we 

964 reduce each of m and n by 2 - m can't go lower because of n, then n 

965 can't go lower because of m. 

966 

967 This will take us O(m) iterations to complete, which is exponential in 

968 the data size, as we gradually zig zag our way towards zero. 

969 

970 This can only happen if we're failing to reduce the size of the choice 

971 sequence: The number of iterations that reduce the length of the choice 

972 sequence is bounded by that length. 

973 

974 So what we do is this: We keep track of which nodes are changing, and 

975 then if there's some non-zero common offset to them we try and minimize 

976 them all at once by lowering that offset. 

977 

978 This may not work, and it definitely won't get us out of all possible 

979 exponential slow downs (an example of where it doesn't is where the 

980 shape of the nodes changes as a result of this bouncing behaviour), 

981 but it fails fast when it doesn't work and gets us out of a really 

982 nastily slow case when it does. 

983 """ 

984 if len(self.__changed_nodes) <= 1: 

985 return 

986 

987 changed = [] 

988 for i in sorted(self.__changed_nodes): 

989 node = self.nodes[i] 

990 if node.trivial or node.type != "integer": 

991 continue 

992 changed.append(node) 

993 

994 if not changed: 

995 return 

996 

997 ints = [ 

998 abs(node.value - node.constraints["shrink_towards"]) for node in changed 

999 ] 

1000 offset = min(ints) 

1001 assert offset > 0 

1002 

1003 for i in range(len(ints)): 

1004 ints[i] -= offset 

1005 

1006 st = self.shrink_target 

1007 

1008 def offset_node(node, n): 

1009 return ( 

1010 node.index, 

1011 node.index + 1, 

1012 [node.copy(with_value=node.constraints["shrink_towards"] + n)], 

1013 ) 

1014 

1015 def consider(n, sign): 

1016 return self.consider_new_nodes( 

1017 replace_all( 

1018 st.nodes, 

1019 [ 

1020 offset_node(node, sign * (n + v)) 

1021 for node, v in zip(changed, ints, strict=False) 

1022 ], 

1023 ) 

1024 ) 

1025 

1026 # shrink from both sides 

1027 Integer.shrink(offset, lambda n: consider(n, 1)) 

1028 Integer.shrink(offset, lambda n: consider(n, -1)) 

1029 self.clear_change_tracking() 

1030 

1031 def clear_change_tracking(self): 

1032 self.__last_checked_changed_at = self.shrink_target 

1033 self.__all_changed_nodes = set() 

1034 

1035 def mark_changed(self, i): 

1036 self.__changed_nodes.add(i) 

1037 

1038 @property 

1039 def __changed_nodes(self) -> set[int]: 

1040 if self.__last_checked_changed_at is self.shrink_target: 

1041 return self.__all_changed_nodes 

1042 

1043 prev_target = self.__last_checked_changed_at 

1044 new_target = self.shrink_target 

1045 assert prev_target is not new_target 

1046 prev_nodes = prev_target.nodes 

1047 new_nodes = new_target.nodes 

1048 assert sort_key(new_target.nodes) < sort_key(prev_target.nodes) 

1049 

1050 if len(prev_nodes) != len(new_nodes) or any( 

1051 n1.type != n2.type for n1, n2 in zip(prev_nodes, new_nodes, strict=True) 

1052 ): 

1053 # should we check constraints are equal as well? 

1054 self.__all_changed_nodes = set() 

1055 else: 

1056 assert len(prev_nodes) == len(new_nodes) 

1057 for i, (n1, n2) in enumerate(zip(prev_nodes, new_nodes, strict=True)): 

1058 assert n1.type == n2.type 

1059 if not choice_equal(n1.value, n2.value): 

1060 self.__all_changed_nodes.add(i) 

1061 

1062 return self.__all_changed_nodes 

1063 

1064 def update_shrink_target(self, new_target): 

1065 assert isinstance(new_target, ConjectureResult) 

1066 self.shrinks += 1 

1067 # If we are just taking a long time to shrink we don't want to 

1068 # trigger this heuristic, so whenever we shrink successfully 

1069 # we give ourselves a bit of breathing room to make sure we 

1070 # would find a shrink that took that long to find the next time. 

1071 # The case where we're taking a long time but making steady 

1072 # progress is handled by `finish_shrinking_deadline` in engine.py 

1073 self.max_stall = max( 

1074 self.max_stall, (self.calls - self.calls_at_last_shrink) * 2 

1075 ) 

1076 self.calls_at_last_shrink = self.calls 

1077 self.shrink_target = new_target 

1078 self.__derived_values = {} 

1079 

1080 def try_shrinking_nodes(self, nodes, n): 

1081 """Attempts to replace each node in the nodes list with n. Returns 

1082 True if it succeeded (which may include some additional modifications 

1083 to shrink_target). 

1084 

1085 In current usage it is expected that each of the nodes currently have 

1086 the same value and choice_type, although this is not essential. Note that 

1087 n must be < the node at min(nodes) or this is not a valid shrink. 

1088 

1089 This method will attempt to do some small amount of work to delete data 

1090 that occurs after the end of the nodes. This is useful for cases where 

1091 there is some size dependency on the value of a node. 

1092 """ 

1093 # If the length of the shrink target has changed from under us such that 

1094 # the indices are out of bounds, give up on the replacement. 

1095 # TODO_BETTER_SHRINK: we probably want to narrow down the root cause here at some point. 

1096 if any(node.index >= len(self.nodes) for node in nodes): 

1097 return # pragma: no cover 

1098 

1099 initial_attempt = replace_all( 

1100 self.nodes, 

1101 [(node.index, node.index + 1, [node.copy(with_value=n)]) for node in nodes], 

1102 ) 

1103 

1104 attempt = self.cached_test_function(initial_attempt)[1] 

1105 

1106 if attempt is None: 

1107 return False 

1108 

1109 if attempt is self.shrink_target: 

1110 # if the initial shrink was a success, try lowering offsets. 

1111 self.lower_common_node_offset() 

1112 return True 

1113 

1114 # If this produced something completely invalid we ditch it 

1115 # here rather than trying to persevere. 

1116 if attempt.status is Status.OVERRUN: 

1117 return False 

1118 

1119 if attempt.status is Status.INVALID: 

1120 return False 

1121 

1122 if attempt.misaligned_at is not None: 

1123 # we're invalid due to a misalignment in the tree. We'll try to fix 

1124 # a very specific type of misalignment here: where we have a node of 

1125 # {"size": n} and tried to draw the same node, but with {"size": m < n}. 

1126 # This can occur with eg 

1127 # 

1128 # n = data.draw_integer() 

1129 # s = data.draw_string(min_size=n) 

1130 # 

1131 # where we try lowering n, resulting in the test_function drawing a lower 

1132 # min_size than our attempt had for the draw_string node. 

1133 # 

1134 # We'll now try realigning this tree by: 

1135 # * replacing the constraints in our attempt with what test_function tried 

1136 # to draw in practice 

1137 # * truncating the value of that node to match min_size 

1138 # 

1139 # This helps in the specific case of drawing a value and then drawing 

1140 # a collection of that size...and not much else. In practice this 

1141 # helps because this antipattern is fairly common. 

1142 

1143 # TODO we'll probably want to apply the same trick as in the valid 

1144 # case of this function of preserving from the right instead of 

1145 # preserving from the left. see test_can_shrink_variable_string_draws. 

1146 

1147 index, attempt_choice_type, attempt_constraints, _attempt_forced = ( 

1148 attempt.misaligned_at 

1149 ) 

1150 node = self.nodes[index] 

1151 if node.type != attempt_choice_type: 

1152 return False # pragma: no cover 

1153 if node.was_forced: 

1154 return False # pragma: no cover 

1155 

1156 if node.type in {"string", "bytes"}: 

1157 # if the size *increased*, we would have to guess what to pad with 

1158 # in order to try fixing up this attempt. Just give up. 

1159 if node.constraints["min_size"] <= attempt_constraints["min_size"]: 

1160 # attempts which increase min_size tend to overrun rather than 

1161 # be misaligned, making a covering case difficult. 

1162 return False # pragma: no cover 

1163 # the size decreased in our attempt. Try again, but truncate the value 

1164 # to that size by removing any elements past min_size. 

1165 return self.consider_new_nodes( 

1166 initial_attempt[: node.index] 

1167 + [ 

1168 initial_attempt[node.index].copy( 

1169 with_constraints=attempt_constraints, 

1170 with_value=initial_attempt[node.index].value[ 

1171 : attempt_constraints["min_size"] 

1172 ], 

1173 ) 

1174 ] 

1175 + initial_attempt[node.index :] 

1176 ) 

1177 

1178 lost_nodes = len(self.nodes) - len(attempt.nodes) 

1179 if lost_nodes <= 0: 

1180 return False 

1181 

1182 start = nodes[0].index 

1183 end = nodes[-1].index + 1 

1184 # We now look for contiguous regions to delete that might help fix up 

1185 # this failed shrink. We only look for contiguous regions of the right 

1186 # lengths because doing anything more than that starts to get very 

1187 # expensive. See minimize_individual_choices for where we 

1188 # try to be more aggressive. 

1189 regions_to_delete = {(end, end + lost_nodes)} 

1190 

1191 for ex in self.spans: 

1192 if ex.start > start: 

1193 continue 

1194 if ex.end <= end: 

1195 continue 

1196 

1197 if ex.index >= len(attempt.spans): 

1198 continue # pragma: no cover 

1199 

1200 replacement = attempt.spans[ex.index] 

1201 in_original = [c for c in ex.children if c.start >= end] 

1202 in_replaced = [c for c in replacement.children if c.start >= end] 

1203 

1204 if len(in_replaced) >= len(in_original) or not in_replaced: 

1205 continue 

1206 

1207 # We've found a span where some of the children went missing 

1208 # as a result of this change, and just replacing it with the data 

1209 # it would have had and removing the spillover didn't work. This 

1210 # means that some of its children towards the right must be 

1211 # important, so we try to arrange it so that it retains its 

1212 # rightmost children instead of its leftmost. 

1213 regions_to_delete.add( 

1214 (in_original[0].start, in_original[-len(in_replaced)].start) 

1215 ) 

1216 

1217 for u, v in sorted(regions_to_delete, key=lambda x: x[1] - x[0], reverse=True): 

1218 try_with_deleted = initial_attempt[:u] + initial_attempt[v:] 

1219 if self.consider_new_nodes(try_with_deleted): 

1220 return True 

1221 

1222 return False 

1223 

1224 def remove_discarded(self): 

1225 """Try removing all bytes marked as discarded. 

1226 

1227 This is primarily to deal with data that has been ignored while 

1228 doing rejection sampling - e.g. as a result of an integer range, or a 

1229 filtered strategy. 

1230 

1231 Such data will also be handled by the adaptive_example_deletion pass, 

1232 but that pass is necessarily more conservative and will try deleting 

1233 each interval individually. The common case is that all data drawn and 

1234 rejected can just be thrown away immediately in one block, so this pass 

1235 will be much faster than trying each one individually when it works. 

1236 

1237 returns False if there is discarded data and removing it does not work, 

1238 otherwise returns True. 

1239 """ 

1240 while self.shrink_target.has_discards: 

1241 discarded = [] 

1242 

1243 for ex in self.shrink_target.spans: 

1244 if ( 

1245 ex.choice_count > 0 

1246 and ex.discarded 

1247 and (not discarded or ex.start >= discarded[-1][-1]) 

1248 ): 

1249 discarded.append((ex.start, ex.end)) 

1250 

1251 # This can happen if we have discards but they are all of 

1252 # zero length. This shouldn't happen very often so it's 

1253 # faster to check for it here than at the point of example 

1254 # generation. 

1255 if not discarded: 

1256 break 

1257 

1258 attempt = list(self.nodes) 

1259 for u, v in reversed(discarded): 

1260 del attempt[u:v] 

1261 

1262 if not self.consider_new_nodes(tuple(attempt)): 

1263 return False 

1264 return True 

1265 

1266 @derived_value # type: ignore 

1267 def duplicated_nodes(self): 

1268 """Returns a list of nodes grouped (choice_type, value).""" 

1269 duplicates = defaultdict(list) 

1270 for node in self.nodes: 

1271 duplicates[(node.type, choice_key(node.value))].append(node) 

1272 return list(duplicates.values()) 

1273 

1274 def node_program(self, program: str) -> ShrinkPass: 

1275 return ShrinkPass( 

1276 lambda chooser: self._node_program(chooser, program), 

1277 name=f"node_program_{program}", 

1278 ) 

1279 

1280 def _node_program(self, chooser, program): 

1281 n = len(program) 

1282 # Adaptively attempt to run the node program at the current 

1283 # index. If this successfully applies the node program ``k`` times 

1284 # then this runs in ``O(log(k))`` test function calls. 

1285 i = chooser.choose(range(len(self.nodes) - n + 1)) 

1286 

1287 # First, run the node program at the chosen index. If this fails, 

1288 # don't do any extra work, so that failure is as cheap as possible. 

1289 if not self.run_node_program(i, program, original=self.shrink_target): 

1290 return 

1291 

1292 # Because we run in a random order we will often find ourselves in the middle 

1293 # of a region where we could run the node program. We thus start by moving 

1294 # left to the beginning of that region if possible in order to start from 

1295 # the beginning of that region. 

1296 def offset_left(k): 

1297 return i - k * n 

1298 

1299 i = offset_left( 

1300 find_integer( 

1301 lambda k: self.run_node_program( 

1302 offset_left(k), program, original=self.shrink_target 

1303 ) 

1304 ) 

1305 ) 

1306 

1307 original = self.shrink_target 

1308 # Now try to run the node program multiple times here. 

1309 find_integer( 

1310 lambda k: self.run_node_program(i, program, original=original, repeats=k) 

1311 ) 

1312 

1313 def minimize_duplicated_choices(self, chooser): 

1314 """Find choices that have been duplicated in multiple places and attempt 

1315 to minimize all of the duplicates simultaneously. 

1316 

1317 This lets us handle cases where two values can't be shrunk 

1318 independently of each other but can easily be shrunk together. 

1319 For example if we had something like: 

1320 

1321 ls = data.draw(lists(integers())) 

1322 y = data.draw(integers()) 

1323 assert y not in ls 

1324 

1325 Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were 

1326 to replace both 3s with 0, this would be a valid shrink, but if we were 

1327 to replace either 3 with 0 on its own the test would start passing. 

1328 

1329 It is also useful for when that duplication is accidental and the value 

1330 of the choices don't matter very much because it allows us to replace 

1331 more values at once. 

1332 """ 

1333 nodes = chooser.choose(self.duplicated_nodes) 

1334 # we can't lower any nodes which are trivial. try proceeding with the 

1335 # remaining nodes. 

1336 nodes = [node for node in nodes if not node.trivial] 

1337 if len(nodes) <= 1: 

1338 return 

1339 

1340 self.minimize_nodes(nodes) 

1341 

1342 def redistribute_numeric_pairs(self, chooser): 

1343 """If there is a sum of generated numbers that we need their sum 

1344 to exceed some bound, lowering one of them requires raising the 

1345 other. This pass enables that.""" 

1346 

1347 # look for a pair of nodes (node1, node2) which are both numeric 

1348 # and aren't separated by too many other nodes. We'll decrease node1 and 

1349 # increase node2 (note that the other way around doesn't make sense as 

1350 # it's strictly worse in the ordering). 

1351 def can_choose_node(node): 

1352 # don't choose nan, inf, or floats above the threshold where f + 1 > f 

1353 # (which is not necessarily true for floats above MAX_PRECISE_INTEGER). 

1354 # The motivation for the last condition is to avoid trying weird 

1355 # non-shrinks where we raise one node and think we lowered another 

1356 # (but didn't). 

1357 return node.type in {"integer", "float"} and not ( 

1358 node.type == "float" 

1359 and (math.isnan(node.value) or abs(node.value) >= MAX_PRECISE_INTEGER) 

1360 ) 

1361 

1362 node1 = chooser.choose( 

1363 self.nodes, 

1364 lambda node: can_choose_node(node) and not node.trivial, 

1365 ) 

1366 node2 = chooser.choose( 

1367 self.nodes, 

1368 lambda node: can_choose_node(node) 

1369 # Note that it's fine for node2 to be trivial, because we're going to 

1370 # explicitly make it *not* trivial by adding to its value. 

1371 and not node.was_forced 

1372 # to avoid quadratic behavior, scan ahead only a small amount for 

1373 # the related node. 

1374 and node1.index < node.index <= node1.index + 4, 

1375 ) 

1376 

1377 m: int | float = node1.value 

1378 n: int | float = node2.value 

1379 

1380 def boost(k: int) -> bool: 

1381 # floats always shrink towards 0 

1382 shrink_towards = ( 

1383 node1.constraints["shrink_towards"] if node1.type == "integer" else 0 

1384 ) 

1385 if k > abs(m - shrink_towards): 

1386 return False 

1387 

1388 # We are trying to move node1 (m) closer to shrink_towards, and node2 

1389 # (n) farther away from shrink_towards. If m is below shrink_towards, 

1390 # we want to add to m and subtract from n, and vice versa if above 

1391 # shrink_towards. 

1392 if m < shrink_towards: 

1393 k = -k 

1394 

1395 try: 

1396 v1 = m - k 

1397 v2 = n + k 

1398 except OverflowError: # pragma: no cover 

1399 # if n or m is a float and k is over sys.float_info.max, coercing 

1400 # k to a float will overflow. 

1401 return False 

1402 

1403 # if we've increased node2 to the point that we're past max precision, 

1404 # give up - things have become too unstable. 

1405 if node1.type == "float" and abs(v2) >= MAX_PRECISE_INTEGER: 

1406 return False 

1407 

1408 return self.consider_new_nodes( 

1409 self.nodes[: node1.index] 

1410 + (node1.copy(with_value=v1),) 

1411 + self.nodes[node1.index + 1 : node2.index] 

1412 + (node2.copy(with_value=v2),) 

1413 + self.nodes[node2.index + 1 :] 

1414 ) 

1415 

1416 find_integer(boost) 

1417 

1418 def lower_integers_together(self, chooser): 

1419 node1 = chooser.choose( 

1420 self.nodes, lambda n: n.type == "integer" and not n.trivial 

1421 ) 

1422 # Search up to 3 nodes ahead, to avoid quadratic time. 

1423 node2 = self.nodes[ 

1424 chooser.choose( 

1425 range(node1.index + 1, min(len(self.nodes), node1.index + 3 + 1)), 

1426 lambda i: self.nodes[i].type == "integer" 

1427 and not self.nodes[i].was_forced, 

1428 ) 

1429 ] 

1430 

1431 # one might expect us to require node2 to be nontrivial, and to minimize 

1432 # the node which is closer to its shrink_towards, rather than node1 

1433 # unconditionally. In reality, it's acceptable for us to transition node2 

1434 # from trivial to nontrivial, because the shrink ordering is dominated by 

1435 # the complexity of the earlier node1. What matters is minimizing node1. 

1436 shrink_towards = node1.constraints["shrink_towards"] 

1437 

1438 def consider(n): 

1439 return self.consider_new_nodes( 

1440 self.nodes[: node1.index] 

1441 + (node1.copy(with_value=node1.value - n),) 

1442 + self.nodes[node1.index + 1 : node2.index] 

1443 + (node2.copy(with_value=node2.value - n),) 

1444 + self.nodes[node2.index + 1 :] 

1445 ) 

1446 

1447 find_integer(lambda n: consider(shrink_towards - n)) 

1448 find_integer(lambda n: consider(n - shrink_towards)) 

1449 

1450 def lower_duplicated_characters(self, chooser): 

1451 """ 

1452 Select two string choices no more than 4 choices apart and simultaneously 

1453 lower characters which appear in both strings. This helps cases where the 

1454 same character must appear in two strings, but the actual value of the 

1455 character is not relevant. 

1456 

1457 This shrinking pass currently only tries lowering *all* instances of the 

1458 duplicated character in both strings. So for instance, given two choices: 

1459 

1460 "bbac" 

1461 "abbb" 

1462 

1463 we would try lowering all five of the b characters simultaneously. This 

1464 may fail to shrink some cases where only certain character indices are 

1465 correlated, for instance if only the b at index 1 could be lowered 

1466 simultaneously and the rest did in fact actually have to be a `b`. 

1467 

1468 It would be nice to try shrinking that case as well, but we would need good 

1469 safeguards because it could get very expensive to try all combinations. 

1470 I expect lowering all duplicates to handle most cases in the meantime. 

1471 """ 

1472 node1 = chooser.choose( 

1473 self.nodes, lambda n: n.type == "string" and not n.trivial 

1474 ) 

1475 

1476 # limit search to up to 4 choices ahead, to avoid quadratic behavior 

1477 node2 = self.nodes[ 

1478 chooser.choose( 

1479 range(node1.index + 1, min(len(self.nodes), node1.index + 1 + 4)), 

1480 lambda i: self.nodes[i].type == "string" and not self.nodes[i].trivial 

1481 # select nodes which have at least one of the same character present 

1482 and set(node1.value) & set(self.nodes[i].value), 

1483 ) 

1484 ] 

1485 

1486 duplicated_characters = set(node1.value) & set(node2.value) 

1487 # deterministic ordering 

1488 char = chooser.choose(sorted(duplicated_characters)) 

1489 intervals = node1.constraints["intervals"] 

1490 

1491 def copy_node(node, n): 

1492 # replace all duplicate characters in each string. This might miss 

1493 # some shrinks compared to only replacing some, but trying all possible 

1494 # combinations of indices could get expensive if done without some 

1495 # thought. 

1496 return node.copy( 

1497 with_value=node.value.replace(char, intervals.char_in_shrink_order(n)) 

1498 ) 

1499 

1500 Integer.shrink( 

1501 intervals.index_from_char_in_shrink_order(char), 

1502 lambda n: self.consider_new_nodes( 

1503 self.nodes[: node1.index] 

1504 + (copy_node(node1, n),) 

1505 + self.nodes[node1.index + 1 : node2.index] 

1506 + (copy_node(node2, n),) 

1507 + self.nodes[node2.index + 1 :] 

1508 ), 

1509 ) 

1510 

1511 def minimize_nodes(self, nodes): 

1512 choice_type = nodes[0].type 

1513 value = nodes[0].value 

1514 # unlike choice_type and value, constraints are *not* guaranteed to be equal among all 

1515 # passed nodes. We arbitrarily use the constraints of the first node. I think 

1516 # this is unsound (= leads to us trying shrinks that could not have been 

1517 # generated), but those get discarded at test-time, and this enables useful 

1518 # slips where constraints are not equal but are close enough that doing the 

1519 # same operation on both basically just works. 

1520 constraints = nodes[0].constraints 

1521 assert all( 

1522 node.type == choice_type and choice_equal(node.value, value) 

1523 for node in nodes 

1524 ) 

1525 

1526 if choice_type == "integer": 

1527 shrink_towards = constraints["shrink_towards"] 

1528 # try shrinking from both sides towards shrink_towards. 

1529 # we're starting from n = abs(shrink_towards - value). Because the 

1530 # shrinker will not check its starting value, we need to try 

1531 # shrinking to n first. 

1532 self.try_shrinking_nodes(nodes, abs(shrink_towards - value)) 

1533 Integer.shrink( 

1534 abs(shrink_towards - value), 

1535 lambda n: self.try_shrinking_nodes(nodes, shrink_towards + n), 

1536 ) 

1537 Integer.shrink( 

1538 abs(shrink_towards - value), 

1539 lambda n: self.try_shrinking_nodes(nodes, shrink_towards - n), 

1540 ) 

1541 elif choice_type == "float": 

1542 self.try_shrinking_nodes(nodes, abs(value)) 

1543 Float.shrink( 

1544 abs(value), 

1545 lambda val: self.try_shrinking_nodes(nodes, val), 

1546 ) 

1547 Float.shrink( 

1548 abs(value), 

1549 lambda val: self.try_shrinking_nodes(nodes, -val), 

1550 ) 

1551 elif choice_type == "boolean": 

1552 # must be True, otherwise would be trivial and not selected. 

1553 assert value is True 

1554 # only one thing to try: false! 

1555 self.try_shrinking_nodes(nodes, False) 

1556 elif choice_type == "bytes": 

1557 Bytes.shrink( 

1558 value, 

1559 lambda val: self.try_shrinking_nodes(nodes, val), 

1560 min_size=constraints["min_size"], 

1561 ) 

1562 elif choice_type == "string": 

1563 String.shrink( 

1564 value, 

1565 lambda val: self.try_shrinking_nodes(nodes, val), 

1566 intervals=constraints["intervals"], 

1567 min_size=constraints["min_size"], 

1568 ) 

1569 else: 

1570 raise NotImplementedError 

1571 

1572 def try_trivial_spans(self, chooser): 

1573 i = chooser.choose(range(len(self.spans))) 

1574 

1575 prev = self.shrink_target 

1576 nodes = self.shrink_target.nodes 

1577 span = self.spans[i] 

1578 prefix = nodes[: span.start] 

1579 replacement = tuple( 

1580 [ 

1581 ( 

1582 node 

1583 if node.was_forced 

1584 else node.copy( 

1585 with_value=choice_from_index(0, node.type, node.constraints) 

1586 ) 

1587 ) 

1588 for node in nodes[span.start : span.end] 

1589 ] 

1590 ) 

1591 suffix = nodes[span.end :] 

1592 attempt = self.cached_test_function(prefix + replacement + suffix)[1] 

1593 

1594 if self.shrink_target is not prev: 

1595 return 

1596 

1597 if isinstance(attempt, ConjectureResult): 

1598 new_span = attempt.spans[i] 

1599 new_replacement = attempt.nodes[new_span.start : new_span.end] 

1600 self.consider_new_nodes(prefix + new_replacement + suffix) 

1601 

1602 def minimize_individual_choices(self, chooser): 

1603 """Attempt to minimize each choice in sequence. 

1604 

1605 This is the pass that ensures that e.g. each integer we draw is a 

1606 minimum value. So it's the part that guarantees that if we e.g. do 

1607 

1608 x = data.draw(integers()) 

1609 assert x < 10 

1610 

1611 then in our shrunk example, x = 10 rather than say 97. 

1612 

1613 If we are unsuccessful at minimizing a choice of interest we then 

1614 check if that's because it's changing the size of the test case and, 

1615 if so, we also make an attempt to delete parts of the test case to 

1616 see if that fixes it. 

1617 

1618 We handle most of the common cases in try_shrinking_nodes which is 

1619 pretty good at clearing out large contiguous blocks of dead space, 

1620 but it fails when there is data that has to stay in particular places 

1621 in the list. 

1622 """ 

1623 node = chooser.choose(self.nodes, lambda node: not node.trivial) 

1624 initial_target = self.shrink_target 

1625 

1626 self.minimize_nodes([node]) 

1627 if self.shrink_target is not initial_target: 

1628 # the shrink target changed, so our shrink worked. Defer doing 

1629 # anything more intelligent until this shrink fails. 

1630 return 

1631 

1632 # the shrink failed. One particularly common case where minimizing a 

1633 # node can fail is the antipattern of drawing a size and then drawing a 

1634 # collection of that size, or more generally when there is a size 

1635 # dependency on some single node. We'll explicitly try and fix up this 

1636 # common case here: if decreasing an integer node by one would reduce 

1637 # the size of the generated input, we'll try deleting things after that 

1638 # node and see if the resulting attempt works. 

1639 

1640 if node.type != "integer": 

1641 # Only try this fixup logic on integer draws. Almost all size 

1642 # dependencies are on integer draws, and if it's not, it's doing 

1643 # something convoluted enough that it is unlikely to shrink well anyway. 

1644 # TODO: extent to floats? we probably currently fail on the following, 

1645 # albeit convoluted example: 

1646 # n = int(data.draw(st.floats())) 

1647 # s = data.draw(st.lists(st.integers(), min_size=n, max_size=n)) 

1648 return 

1649 

1650 lowered = ( 

1651 self.nodes[: node.index] 

1652 + (node.copy(with_value=node.value - 1),) 

1653 + self.nodes[node.index + 1 :] 

1654 ) 

1655 attempt = self.cached_test_function(lowered)[1] 

1656 if ( 

1657 attempt is None 

1658 or attempt.status < Status.VALID 

1659 or len(attempt.nodes) == len(self.nodes) 

1660 or len(attempt.nodes) == node.index + 1 

1661 ): 

1662 # no point in trying our size-dependency-logic if our attempt at 

1663 # lowering the node resulted in: 

1664 # * an invalid conjecture data 

1665 # * the same number of nodes as before 

1666 # * no nodes beyond the lowered node (nothing to try to delete afterwards) 

1667 return 

1668 

1669 # If it were then the original shrink should have worked and we could 

1670 # never have got here. 

1671 assert attempt is not self.shrink_target 

1672 

1673 @self.cached(node.index) 

1674 def first_span_after_node(): 

1675 lo = 0 

1676 hi = len(self.spans) 

1677 while lo + 1 < hi: 

1678 mid = (lo + hi) // 2 

1679 span = self.spans[mid] 

1680 if span.start >= node.index: 

1681 hi = mid 

1682 else: 

1683 lo = mid 

1684 return hi 

1685 

1686 # we try deleting both entire spans, and single nodes. 

1687 # If we wanted to get more aggressive, we could try deleting n 

1688 # consecutive nodes (that don't cross a span boundary) for say 

1689 # n <= 2 or n <= 3. 

1690 if chooser.choose([True, False]): 

1691 span = self.spans[ 

1692 chooser.choose( 

1693 range(first_span_after_node, len(self.spans)), 

1694 lambda i: self.spans[i].choice_count > 0, 

1695 ) 

1696 ] 

1697 self.consider_new_nodes(lowered[: span.start] + lowered[span.end :]) 

1698 else: 

1699 node = self.nodes[chooser.choose(range(node.index + 1, len(self.nodes)))] 

1700 self.consider_new_nodes(lowered[: node.index] + lowered[node.index + 1 :]) 

1701 

1702 def reorder_spans(self, chooser): 

1703 """This pass allows us to reorder the children of each span. 

1704 

1705 For example, consider the following: 

1706 

1707 .. code-block:: python 

1708 

1709 import hypothesis.strategies as st 

1710 from hypothesis import given 

1711 

1712 

1713 @given(st.text(), st.text()) 

1714 def test_not_equal(x, y): 

1715 assert x != y 

1716 

1717 Without the ability to reorder x and y this could fail either with 

1718 ``x=""``, ``y="0"``, or the other way around. With reordering it will 

1719 reliably fail with ``x=""``, ``y="0"``. 

1720 """ 

1721 span = chooser.choose(self.spans) 

1722 

1723 label = chooser.choose(span.children).label 

1724 spans = [c for c in span.children if c.label == label] 

1725 if len(spans) <= 1: 

1726 return 

1727 

1728 endpoints = [(span.start, span.end) for span in spans] 

1729 st = self.shrink_target 

1730 

1731 Ordering.shrink( 

1732 range(len(spans)), 

1733 lambda indices: self.consider_new_nodes( 

1734 replace_all( 

1735 st.nodes, 

1736 [ 

1737 ( 

1738 u, 

1739 v, 

1740 st.nodes[spans[i].start : spans[i].end], 

1741 ) 

1742 for (u, v), i in zip(endpoints, indices, strict=True) 

1743 ], 

1744 ) 

1745 ), 

1746 key=lambda i: sort_key(st.nodes[spans[i].start : spans[i].end]), 

1747 ) 

1748 

1749 def run_node_program(self, i, program, original, repeats=1): 

1750 """Node programs are a mini-DSL for node rewriting, defined as a sequence 

1751 of commands that can be run at some index into the nodes 

1752 

1753 Commands are: 

1754 

1755 * "X", delete this node 

1756 

1757 This method runs the node program in ``program`` at node index 

1758 ``i`` on the ConjectureData ``original``. If ``repeats > 1`` then it 

1759 will attempt to approximate the results of running it that many times. 

1760 

1761 Returns True if this successfully changes the underlying shrink target, 

1762 else False. 

1763 """ 

1764 if i + len(program) > len(original.nodes) or i < 0: 

1765 return False 

1766 attempt = list(original.nodes) 

1767 for _ in range(repeats): 

1768 for k, command in reversed(list(enumerate(program))): 

1769 j = i + k 

1770 if j >= len(attempt): 

1771 return False 

1772 

1773 if command == "X": 

1774 del attempt[j] 

1775 else: 

1776 raise NotImplementedError(f"Unrecognised command {command!r}") 

1777 

1778 return self.consider_new_nodes(attempt)