Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/shrinker.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

613 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import math 

12from collections import defaultdict 

13from collections.abc import Sequence 

14from dataclasses import dataclass 

15from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Union, cast 

16 

17from hypothesis.internal.conjecture.choice import ( 

18 ChoiceNode, 

19 ChoiceT, 

20 choice_equal, 

21 choice_from_index, 

22 choice_key, 

23 choice_permitted, 

24 choice_to_index, 

25) 

26from hypothesis.internal.conjecture.data import ( 

27 ConjectureData, 

28 ConjectureResult, 

29 Spans, 

30 Status, 

31 _Overrun, 

32 draw_choice, 

33) 

34from hypothesis.internal.conjecture.junkdrawer import ( 

35 endswith, 

36 find_integer, 

37 replace_all, 

38 startswith, 

39) 

40from hypothesis.internal.conjecture.shrinking import ( 

41 Bytes, 

42 Float, 

43 Integer, 

44 Ordering, 

45 String, 

46) 

47from hypothesis.internal.conjecture.shrinking.choicetree import ( 

48 ChoiceTree, 

49 prefix_selection_order, 

50 random_selection_order, 

51) 

52from hypothesis.internal.floats import MAX_PRECISE_INTEGER 

53 

54if TYPE_CHECKING: 

55 from random import Random 

56 from typing import TypeAlias 

57 

58 from hypothesis.internal.conjecture.engine import ConjectureRunner 

59 

60ShrinkPredicateT: "TypeAlias" = Callable[[Union[ConjectureResult, _Overrun]], bool] 

61 

62 

63def sort_key(nodes: Sequence[ChoiceNode]) -> tuple[int, tuple[int, ...]]: 

64 """Returns a sort key such that "simpler" choice sequences are smaller than 

65 "more complicated" ones. 

66 

67 We define sort_key so that x is simpler than y if x is shorter than y or if 

68 they have the same length and map(choice_to_index, x) < map(choice_to_index, y). 

69 

70 The reason for using this ordering is: 

71 

72 1. If x is shorter than y then that means we had to make fewer decisions 

73 in constructing the test case when we ran x than we did when we ran y. 

74 2. If x is the same length as y then replacing a choice with a lower index 

75 choice corresponds to replacing it with a simpler/smaller choice. 

76 3. Because choices drawn early in generation potentially get used in more 

77 places they potentially have a more significant impact on the final 

78 result, so it makes sense to prioritise reducing earlier choices over 

79 later ones. 

80 """ 

81 return ( 

82 len(nodes), 

83 tuple(choice_to_index(node.value, node.constraints) for node in nodes), 

84 ) 

85 

86 

87@dataclass 

88class ShrinkPass: 

89 function: Any 

90 name: Optional[str] = None 

91 last_prefix: Any = () 

92 

93 # some execution statistics 

94 calls: int = 0 

95 misaligned: int = 0 

96 shrinks: int = 0 

97 deletions: int = 0 

98 

99 def __post_init__(self): 

100 if self.name is None: 

101 self.name = self.function.__name__ 

102 

103 def __hash__(self): 

104 return hash(self.name) 

105 

106 

107class StopShrinking(Exception): 

108 pass 

109 

110 

111class Shrinker: 

112 """A shrinker is a child object of a ConjectureRunner which is designed to 

113 manage the associated state of a particular shrink problem. That is, we 

114 have some initial ConjectureData object and some property of interest 

115 that it satisfies, and we want to find a ConjectureData object with a 

116 shortlex (see sort_key above) smaller choice sequence that exhibits the same 

117 property. 

118 

119 Currently the only property of interest we use is that the status is 

120 INTERESTING and the interesting_origin takes on some fixed value, but we 

121 may potentially be interested in other use cases later. 

122 However we assume that data with a status < VALID never satisfies the predicate. 

123 

124 The shrinker keeps track of a value shrink_target which represents the 

125 current best known ConjectureData object satisfying the predicate. 

126 It refines this value by repeatedly running *shrink passes*, which are 

127 methods that perform a series of transformations to the current shrink_target 

128 and evaluate the underlying test function to find new ConjectureData 

129 objects. If any of these satisfy the predicate, the shrink_target 

130 is updated automatically. Shrinking runs until no shrink pass can 

131 improve the shrink_target, at which point it stops. It may also be 

132 terminated if the underlying engine throws RunIsComplete, but that 

133 is handled by the calling code rather than the Shrinker. 

134 

135 ======================= 

136 Designing Shrink Passes 

137 ======================= 

138 

139 Generally a shrink pass is just any function that calls 

140 cached_test_function and/or consider_new_nodes a number of times, 

141 but there are a couple of useful things to bear in mind. 

142 

143 A shrink pass *makes progress* if running it changes self.shrink_target 

144 (i.e. it tries a shortlex smaller ConjectureData object satisfying 

145 the predicate). The desired end state of shrinking is to find a 

146 value such that no shrink pass can make progress, i.e. that we 

147 are at a local minimum for each shrink pass. 

148 

149 In aid of this goal, the main invariant that a shrink pass much 

150 satisfy is that whether it makes progress must be deterministic. 

151 It is fine (encouraged even) for the specific progress it makes 

152 to be non-deterministic, but if you run a shrink pass, it makes 

153 no progress, and then you immediately run it again, it should 

154 never succeed on the second time. This allows us to stop as soon 

155 as we have run each shrink pass and seen no progress on any of 

156 them. 

157 

158 This means that e.g. it's fine to try each of N deletions 

159 or replacements in a random order, but it's not OK to try N random 

160 deletions (unless you have already shrunk at least once, though we 

161 don't currently take advantage of this loophole). 

162 

163 Shrink passes need to be written so as to be robust against 

164 change in the underlying shrink target. It is generally safe 

165 to assume that the shrink target does not change prior to the 

166 point of first modification - e.g. if you change no bytes at 

167 index ``i``, all spans whose start is ``<= i`` still exist, 

168 as do all blocks, and the data object is still of length 

169 ``>= i + 1``. This can only be violated by bad user code which 

170 relies on an external source of non-determinism. 

171 

172 When the underlying shrink_target changes, shrink 

173 passes should not run substantially more test_function calls 

174 on success than they do on failure. Say, no more than a constant 

175 factor more. In particular shrink passes should not iterate to a 

176 fixed point. 

177 

178 This means that shrink passes are often written with loops that 

179 are carefully designed to do the right thing in the case that no 

180 shrinks occurred and try to adapt to any changes to do a reasonable 

181 job. e.g. say we wanted to write a shrink pass that tried deleting 

182 each individual choice (this isn't an especially good pass, 

183 but it leads to a simple illustrative example), we might do it 

184 by iterating over the choice sequence like so: 

185 

186 .. code-block:: python 

187 

188 i = 0 

189 while i < len(self.shrink_target.nodes): 

190 if not self.consider_new_nodes( 

191 self.shrink_target.nodes[:i] + self.shrink_target.nodes[i + 1 :] 

192 ): 

193 i += 1 

194 

195 The reason for writing the loop this way is that i is always a 

196 valid index into the current choice sequence, even if the current sequence 

197 changes as a result of our actions. When the choice sequence changes, 

198 we leave the index where it is rather than restarting from the 

199 beginning, and carry on. This means that the number of steps we 

200 run in this case is always bounded above by the number of steps 

201 we would run if nothing works. 

202 

203 Another thing to bear in mind about shrink pass design is that 

204 they should prioritise *progress*. If you have N operations that 

205 you need to run, you should try to order them in such a way as 

206 to avoid stalling, where you have long periods of test function 

207 invocations where no shrinks happen. This is bad because whenever 

208 we shrink we reduce the amount of work the shrinker has to do 

209 in future, and often speed up the test function, so we ideally 

210 wanted those shrinks to happen much earlier in the process. 

211 

212 Sometimes stalls are inevitable of course - e.g. if the pass 

213 makes no progress, then the entire thing is just one long stall, 

214 but it's helpful to design it so that stalls are less likely 

215 in typical behaviour. 

216 

217 The two easiest ways to do this are: 

218 

219 * Just run the N steps in random order. As long as a 

220 reasonably large proportion of the operations succeed, this 

221 guarantees the expected stall length is quite short. The 

222 book keeping for making sure this does the right thing when 

223 it succeeds can be quite annoying. 

224 * When you have any sort of nested loop, loop in such a way 

225 that both loop variables change each time. This prevents 

226 stalls which occur when one particular value for the outer 

227 loop is impossible to make progress on, rendering the entire 

228 inner loop into a stall. 

229 

230 However, although progress is good, too much progress can be 

231 a bad sign! If you're *only* seeing successful reductions, 

232 that's probably a sign that you are making changes that are 

233 too timid. Two useful things to offset this: 

234 

235 * It's worth writing shrink passes which are *adaptive*, in 

236 the sense that when operations seem to be working really 

237 well we try to bundle multiple of them together. This can 

238 often be used to turn what would be O(m) successful calls 

239 into O(log(m)). 

240 * It's often worth trying one or two special minimal values 

241 before trying anything more fine grained (e.g. replacing 

242 the whole thing with zero). 

243 

244 """ 

245 

246 def derived_value(fn): 

247 """It's useful during shrinking to have access to derived values of 

248 the current shrink target. 

249 

250 This decorator allows you to define these as cached properties. They 

251 are calculated once, then cached until the shrink target changes, then 

252 recalculated the next time they are used.""" 

253 

254 def accept(self): 

255 try: 

256 return self.__derived_values[fn.__name__] 

257 except KeyError: 

258 return self.__derived_values.setdefault(fn.__name__, fn(self)) 

259 

260 accept.__name__ = fn.__name__ 

261 return property(accept) 

262 

263 def __init__( 

264 self, 

265 engine: "ConjectureRunner", 

266 initial: Union[ConjectureData, ConjectureResult], 

267 predicate: Optional[ShrinkPredicateT], 

268 *, 

269 allow_transition: Optional[ 

270 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

271 ], 

272 explain: bool, 

273 in_target_phase: bool = False, 

274 ): 

275 """Create a shrinker for a particular engine, with a given starting 

276 point and predicate. When shrink() is called it will attempt to find an 

277 example for which predicate is True and which is strictly smaller than 

278 initial. 

279 

280 Note that initial is a ConjectureData object, and predicate 

281 takes ConjectureData objects. 

282 """ 

283 assert predicate is not None or allow_transition is not None 

284 self.engine = engine 

285 self.__predicate = predicate or (lambda data: True) 

286 self.__allow_transition = allow_transition or (lambda source, destination: True) 

287 self.__derived_values: dict = {} 

288 

289 self.initial_size = len(initial.choices) 

290 # We keep track of the current best example on the shrink_target 

291 # attribute. 

292 self.shrink_target = initial 

293 self.clear_change_tracking() 

294 self.shrinks = 0 

295 

296 # We terminate shrinks that seem to have reached their logical 

297 # conclusion: If we've called the underlying test function at 

298 # least self.max_stall times since the last time we shrunk, 

299 # it's time to stop shrinking. 

300 self.max_stall = 200 

301 self.initial_calls = self.engine.call_count 

302 self.initial_misaligned = self.engine.misaligned_count 

303 self.calls_at_last_shrink = self.initial_calls 

304 

305 self.shrink_passes: list[ShrinkPass] = [ 

306 ShrinkPass(self.try_trivial_spans), 

307 self.node_program("X" * 5), 

308 self.node_program("X" * 4), 

309 self.node_program("X" * 3), 

310 self.node_program("X" * 2), 

311 self.node_program("X" * 1), 

312 ShrinkPass(self.pass_to_descendant), 

313 ShrinkPass(self.reorder_spans), 

314 ShrinkPass(self.minimize_duplicated_choices), 

315 ShrinkPass(self.minimize_individual_choices), 

316 ShrinkPass(self.redistribute_numeric_pairs), 

317 ShrinkPass(self.lower_integers_together), 

318 ShrinkPass(self.lower_duplicated_characters), 

319 ] 

320 

321 # Because the shrinker is also used to `pareto_optimise` in the target phase, 

322 # we sometimes want to allow extending buffers instead of aborting at the end. 

323 self.__extend: Union[Literal["full"], int] = "full" if in_target_phase else 0 

324 self.should_explain = explain 

325 

326 @derived_value # type: ignore 

327 def cached_calculations(self): 

328 return {} 

329 

330 def cached(self, *keys): 

331 def accept(f): 

332 cache_key = (f.__name__, *keys) 

333 try: 

334 return self.cached_calculations[cache_key] 

335 except KeyError: 

336 return self.cached_calculations.setdefault(cache_key, f()) 

337 

338 return accept 

339 

340 @property 

341 def calls(self) -> int: 

342 """Return the number of calls that have been made to the underlying 

343 test function.""" 

344 return self.engine.call_count 

345 

346 @property 

347 def misaligned(self) -> int: 

348 return self.engine.misaligned_count 

349 

350 def check_calls(self) -> None: 

351 if self.calls - self.calls_at_last_shrink >= self.max_stall: 

352 raise StopShrinking 

353 

354 def cached_test_function( 

355 self, nodes: Sequence[ChoiceNode] 

356 ) -> tuple[bool, Optional[Union[ConjectureResult, _Overrun]]]: 

357 nodes = nodes[: len(self.nodes)] 

358 

359 if startswith(nodes, self.nodes): 

360 return (True, None) 

361 

362 if sort_key(self.nodes) < sort_key(nodes): 

363 return (False, None) 

364 

365 # sometimes our shrinking passes try obviously invalid things. We handle 

366 # discarding them in one place here. 

367 if any(not choice_permitted(node.value, node.constraints) for node in nodes): 

368 return (False, None) 

369 

370 result = self.engine.cached_test_function( 

371 [n.value for n in nodes], extend=self.__extend 

372 ) 

373 previous = self.shrink_target 

374 self.incorporate_test_data(result) 

375 self.check_calls() 

376 return (previous is not self.shrink_target, result) 

377 

378 def consider_new_nodes(self, nodes: Sequence[ChoiceNode]) -> bool: 

379 return self.cached_test_function(nodes)[0] 

380 

381 def incorporate_test_data(self, data): 

382 """Takes a ConjectureData or Overrun object updates the current 

383 shrink_target if this data represents an improvement over it.""" 

384 if data.status < Status.VALID or data is self.shrink_target: 

385 return 

386 if ( 

387 self.__predicate(data) 

388 and sort_key(data.nodes) < sort_key(self.shrink_target.nodes) 

389 and self.__allow_transition(self.shrink_target, data) 

390 ): 

391 self.update_shrink_target(data) 

392 

393 def debug(self, msg: str) -> None: 

394 self.engine.debug(msg) 

395 

396 @property 

397 def random(self) -> "Random": 

398 return self.engine.random 

399 

400 def shrink(self) -> None: 

401 """Run the full set of shrinks and update shrink_target. 

402 

403 This method is "mostly idempotent" - calling it twice is unlikely to 

404 have any effect, though it has a non-zero probability of doing so. 

405 """ 

406 

407 try: 

408 self.initial_coarse_reduction() 

409 self.greedy_shrink() 

410 except StopShrinking: 

411 # If we stopped shrinking because we're making slow progress (instead of 

412 # reaching a local optimum), don't run the explain-phase logic. 

413 self.should_explain = False 

414 finally: 

415 if self.engine.report_debug_info: 

416 

417 def s(n): 

418 return "s" if n != 1 else "" 

419 

420 total_deleted = self.initial_size - len(self.shrink_target.choices) 

421 calls = self.engine.call_count - self.initial_calls 

422 misaligned = self.engine.misaligned_count - self.initial_misaligned 

423 

424 self.debug( 

425 "---------------------\n" 

426 "Shrink pass profiling\n" 

427 "---------------------\n\n" 

428 f"Shrinking made a total of {calls} call{s(calls)} of which " 

429 f"{self.shrinks} shrank and {misaligned} were misaligned. This " 

430 f"deleted {total_deleted} choices out of {self.initial_size}." 

431 ) 

432 for useful in [True, False]: 

433 self.debug("") 

434 if useful: 

435 self.debug("Useful passes:") 

436 else: 

437 self.debug("Useless passes:") 

438 self.debug("") 

439 for pass_ in sorted( 

440 self.shrink_passes, 

441 key=lambda t: (-t.calls, t.deletions, t.shrinks), 

442 ): 

443 if pass_.calls == 0: 

444 continue 

445 if (pass_.shrinks != 0) != useful: 

446 continue 

447 

448 self.debug( 

449 f" * {pass_.name} made {pass_.calls} call{s(pass_.calls)} of which " 

450 f"{pass_.shrinks} shrank and {pass_.misaligned} were misaligned, " 

451 f"deleting {pass_.deletions} choice{s(pass_.deletions)}." 

452 ) 

453 self.debug("") 

454 self.explain() 

455 

456 def explain(self) -> None: 

457 

458 if not self.should_explain or not self.shrink_target.arg_slices: 

459 return 

460 

461 self.max_stall = 2**100 

462 shrink_target = self.shrink_target 

463 nodes = self.nodes 

464 choices = self.choices 

465 chunks: dict[tuple[int, int], list[tuple[ChoiceT, ...]]] = defaultdict(list) 

466 

467 # Before we start running experiments, let's check for known inputs which would 

468 # make them redundant. The shrinking process means that we've already tried many 

469 # variations on the minimal example, so this can save a lot of time. 

470 seen_passing_seq = self.engine.passing_choice_sequences( 

471 prefix=self.nodes[: min(self.shrink_target.arg_slices)[0]] 

472 ) 

473 

474 # Now that we've shrunk to a minimal failing example, it's time to try 

475 # varying each part that we've noted will go in the final report. Consider 

476 # slices in largest-first order 

477 for start, end in sorted( 

478 self.shrink_target.arg_slices, key=lambda x: (-(x[1] - x[0]), x) 

479 ): 

480 # Check for any previous examples that match the prefix and suffix, 

481 # so we can skip if we found a passing example while shrinking. 

482 if any( 

483 startswith(seen, nodes[:start]) and endswith(seen, nodes[end:]) 

484 for seen in seen_passing_seq 

485 ): 

486 continue 

487 

488 # Run our experiments 

489 n_same_failures = 0 

490 note = "or any other generated value" 

491 # TODO: is 100 same-failures out of 500 attempts a good heuristic? 

492 for n_attempt in range(500): # pragma: no branch 

493 # no-branch here because we don't coverage-test the abort-at-500 logic. 

494 

495 if n_attempt - 10 > n_same_failures * 5: 

496 # stop early if we're seeing mostly invalid examples 

497 break # pragma: no cover 

498 

499 # replace start:end with random values 

500 replacement = [] 

501 for i in range(start, end): 

502 node = nodes[i] 

503 if not node.was_forced: 

504 value = draw_choice( 

505 node.type, node.constraints, random=self.random 

506 ) 

507 node = node.copy(with_value=value) 

508 replacement.append(node.value) 

509 

510 attempt = choices[:start] + tuple(replacement) + choices[end:] 

511 result = self.engine.cached_test_function(attempt, extend="full") 

512 

513 if result.status is Status.OVERRUN: 

514 continue # pragma: no cover # flakily covered 

515 result = cast(ConjectureResult, result) 

516 if not ( 

517 len(attempt) == len(result.choices) 

518 and endswith(result.nodes, nodes[end:]) 

519 ): 

520 # Turns out this was a variable-length part, so grab the infix... 

521 for span1, span2 in zip(shrink_target.spans, result.spans): 

522 assert span1.start == span2.start 

523 assert span1.start <= start 

524 assert span1.label == span2.label 

525 if span1.start == start and span1.end == end: 

526 result_end = span2.end 

527 break 

528 else: 

529 raise NotImplementedError("Expected matching prefixes") 

530 

531 attempt = ( 

532 choices[:start] 

533 + result.choices[start:result_end] 

534 + choices[end:] 

535 ) 

536 chunks[(start, end)].append(result.choices[start:result_end]) 

537 result = self.engine.cached_test_function(attempt) 

538 

539 if result.status is Status.OVERRUN: 

540 continue # pragma: no cover # flakily covered 

541 result = cast(ConjectureResult, result) 

542 else: 

543 chunks[(start, end)].append(result.choices[start:end]) 

544 

545 if shrink_target is not self.shrink_target: # pragma: no cover 

546 # If we've shrunk further without meaning to, bail out. 

547 self.shrink_target.slice_comments.clear() 

548 return 

549 if result.status is Status.VALID: 

550 # The test passed, indicating that this param can't vary freely. 

551 # However, it's really hard to write a simple and reliable covering 

552 # test, because of our `seen_passing_buffers` check above. 

553 break # pragma: no cover 

554 if self.__predicate(result): # pragma: no branch 

555 n_same_failures += 1 

556 if n_same_failures >= 100: 

557 self.shrink_target.slice_comments[(start, end)] = note 

558 break 

559 

560 # Finally, if we've found multiple independently-variable parts, check whether 

561 # they can all be varied together. 

562 if len(self.shrink_target.slice_comments) <= 1: 

563 return 

564 n_same_failures_together = 0 

565 chunks_by_start_index = sorted(chunks.items()) 

566 for _ in range(500): # pragma: no branch 

567 # no-branch here because we don't coverage-test the abort-at-500 logic. 

568 new_choices: list[ChoiceT] = [] 

569 prev_end = 0 

570 for (start, end), ls in chunks_by_start_index: 

571 assert prev_end <= start < end, "these chunks must be nonoverlapping" 

572 new_choices.extend(choices[prev_end:start]) 

573 new_choices.extend(self.random.choice(ls)) 

574 prev_end = end 

575 

576 result = self.engine.cached_test_function(new_choices) 

577 

578 # This *can't* be a shrink because none of the components were. 

579 assert shrink_target is self.shrink_target 

580 if result.status == Status.VALID: 

581 self.shrink_target.slice_comments[(0, 0)] = ( 

582 "The test sometimes passed when commented parts were varied together." 

583 ) 

584 break # Test passed, this param can't vary freely. 

585 if self.__predicate(result): # pragma: no branch 

586 n_same_failures_together += 1 

587 if n_same_failures_together >= 100: 

588 self.shrink_target.slice_comments[(0, 0)] = ( 

589 "The test always failed when commented parts were varied together." 

590 ) 

591 break 

592 

593 def greedy_shrink(self) -> None: 

594 """Run a full set of greedy shrinks (that is, ones that will only ever 

595 move to a better target) and update shrink_target appropriately. 

596 

597 This method iterates to a fixed point and so is idempontent - calling 

598 it twice will have exactly the same effect as calling it once. 

599 """ 

600 self.fixate_shrink_passes(self.shrink_passes) 

601 

602 def initial_coarse_reduction(self): 

603 """Performs some preliminary reductions that should not be 

604 repeated as part of the main shrink passes. 

605 

606 The main reason why these can't be included as part of shrink 

607 passes is that they have much more ability to make the test 

608 case "worse". e.g. they might rerandomise part of it, significantly 

609 increasing the value of individual nodes, which works in direct 

610 opposition to the lexical shrinking and will frequently undo 

611 its work. 

612 """ 

613 self.reduce_each_alternative() 

614 

615 @derived_value # type: ignore 

616 def spans_starting_at(self): 

617 result = [[] for _ in self.shrink_target.nodes] 

618 for i, ex in enumerate(self.spans): 

619 # We can have zero-length spans that start at the end 

620 if ex.start < len(result): 

621 result[ex.start].append(i) 

622 return tuple(map(tuple, result)) 

623 

624 def reduce_each_alternative(self): 

625 """This is a pass that is designed to rerandomise use of the 

626 one_of strategy or things that look like it, in order to try 

627 to move from later strategies to earlier ones in the branch 

628 order. 

629 

630 It does this by trying to systematically lower each value it 

631 finds that looks like it might be the branch decision for 

632 one_of, and then attempts to repair any changes in shape that 

633 this causes. 

634 """ 

635 i = 0 

636 while i < len(self.shrink_target.nodes): 

637 nodes = self.shrink_target.nodes 

638 node = nodes[i] 

639 if ( 

640 node.type == "integer" 

641 and not node.was_forced 

642 and node.value <= 10 

643 and node.constraints["min_value"] == 0 

644 ): 

645 assert isinstance(node.value, int) 

646 

647 # We've found a plausible candidate for a ``one_of`` choice. 

648 # We now want to see if the shape of the test case actually depends 

649 # on it. If it doesn't, then we don't need to do this (comparatively 

650 # costly) pass, and can let much simpler lexicographic reduction 

651 # handle it later. 

652 # 

653 # We test this by trying to set the value to zero and seeing if the 

654 # shape changes, as measured by either changing the number of subsequent 

655 # nodes, or changing the nodes in such a way as to cause one of the 

656 # previous values to no longer be valid in its position. 

657 zero_attempt = self.cached_test_function( 

658 nodes[:i] + (nodes[i].copy(with_value=0),) + nodes[i + 1 :] 

659 )[1] 

660 if ( 

661 zero_attempt is not self.shrink_target 

662 and zero_attempt is not None 

663 and zero_attempt.status >= Status.VALID 

664 ): 

665 changed_shape = len(zero_attempt.nodes) != len(nodes) 

666 

667 if not changed_shape: 

668 for j in range(i + 1, len(nodes)): 

669 zero_node = zero_attempt.nodes[j] 

670 orig_node = nodes[j] 

671 if ( 

672 zero_node.type != orig_node.type 

673 or not choice_permitted( 

674 orig_node.value, zero_node.constraints 

675 ) 

676 ): 

677 changed_shape = True 

678 break 

679 if changed_shape: 

680 for v in range(node.value): 

681 if self.try_lower_node_as_alternative(i, v): 

682 break 

683 i += 1 

684 

685 def try_lower_node_as_alternative(self, i, v): 

686 """Attempt to lower `self.shrink_target.nodes[i]` to `v`, 

687 while rerandomising and attempting to repair any subsequent 

688 changes to the shape of the test case that this causes.""" 

689 nodes = self.shrink_target.nodes 

690 if self.consider_new_nodes( 

691 nodes[:i] + (nodes[i].copy(with_value=v),) + nodes[i + 1 :] 

692 ): 

693 return True 

694 

695 prefix = nodes[:i] + (nodes[i].copy(with_value=v),) 

696 initial = self.shrink_target 

697 spans = self.spans_starting_at[i] 

698 for _ in range(3): 

699 random_attempt = self.engine.cached_test_function( 

700 [n.value for n in prefix], extend=len(nodes) 

701 ) 

702 if random_attempt.status < Status.VALID: 

703 continue 

704 self.incorporate_test_data(random_attempt) 

705 for j in spans: 

706 initial_ex = initial.spans[j] 

707 attempt_ex = random_attempt.spans[j] 

708 contents = random_attempt.nodes[attempt_ex.start : attempt_ex.end] 

709 self.consider_new_nodes(nodes[:i] + contents + nodes[initial_ex.end :]) 

710 if initial is not self.shrink_target: 

711 return True 

712 return False 

713 

714 @derived_value # type: ignore 

715 def shrink_pass_choice_trees(self) -> dict[Any, ChoiceTree]: 

716 return defaultdict(ChoiceTree) 

717 

718 def step(self, shrink_pass: ShrinkPass, *, random_order: bool = False) -> bool: 

719 tree = self.shrink_pass_choice_trees[shrink_pass] 

720 if tree.exhausted: 

721 return False 

722 

723 initial_shrinks = self.shrinks 

724 initial_calls = self.calls 

725 initial_misaligned = self.misaligned 

726 size = len(self.shrink_target.choices) 

727 assert shrink_pass.name is not None 

728 self.engine.explain_next_call_as(shrink_pass.name) 

729 

730 if random_order: 

731 selection_order = random_selection_order(self.random) 

732 else: 

733 selection_order = prefix_selection_order(shrink_pass.last_prefix) 

734 

735 try: 

736 shrink_pass.last_prefix = tree.step( 

737 selection_order, 

738 lambda chooser: shrink_pass.function(chooser), 

739 ) 

740 finally: 

741 shrink_pass.calls += self.calls - initial_calls 

742 shrink_pass.misaligned += self.misaligned - initial_misaligned 

743 shrink_pass.shrinks += self.shrinks - initial_shrinks 

744 shrink_pass.deletions += size - len(self.shrink_target.choices) 

745 self.engine.clear_call_explanation() 

746 return True 

747 

748 def fixate_shrink_passes(self, passes: list[ShrinkPass]) -> None: 

749 """Run steps from each pass in ``passes`` until the current shrink target 

750 is a fixed point of all of them.""" 

751 any_ran = True 

752 while any_ran: 

753 any_ran = False 

754 

755 reordering = {} 

756 

757 # We run remove_discarded after every pass to do cleanup 

758 # keeping track of whether that actually works. Either there is 

759 # no discarded data and it is basically free, or it reliably works 

760 # and deletes data, or it doesn't work. In that latter case we turn 

761 # it off for the rest of this loop through the passes, but will 

762 # try again once all of the passes have been run. 

763 can_discard = self.remove_discarded() 

764 

765 calls_at_loop_start = self.calls 

766 

767 # We keep track of how many calls can be made by a single step 

768 # without making progress and use this to test how much to pad 

769 # out self.max_stall by as we go along. 

770 max_calls_per_failing_step = 1 

771 

772 for sp in passes: 

773 if can_discard: 

774 can_discard = self.remove_discarded() 

775 

776 before_sp = self.shrink_target 

777 

778 # Run the shrink pass until it fails to make any progress 

779 # max_failures times in a row. This implicitly boosts shrink 

780 # passes that are more likely to work. 

781 failures = 0 

782 max_failures = 20 

783 while failures < max_failures: 

784 # We don't allow more than max_stall consecutive failures 

785 # to shrink, but this means that if we're unlucky and the 

786 # shrink passes are in a bad order where only the ones at 

787 # the end are useful, if we're not careful this heuristic 

788 # might stop us before we've tried everything. In order to 

789 # avoid that happening, we make sure that there's always 

790 # plenty of breathing room to make it through a single 

791 # iteration of the fixate_shrink_passes loop. 

792 self.max_stall = max( 

793 self.max_stall, 

794 2 * max_calls_per_failing_step 

795 + (self.calls - calls_at_loop_start), 

796 ) 

797 

798 prev = self.shrink_target 

799 initial_calls = self.calls 

800 # It's better for us to run shrink passes in a deterministic 

801 # order, to avoid repeat work, but this can cause us to create 

802 # long stalls when there are a lot of steps which fail to do 

803 # anything useful. In order to avoid this, once we've noticed 

804 # we're in a stall (i.e. half of max_failures calls have failed 

805 # to do anything) we switch to randomly jumping around. If we 

806 # find a success then we'll resume deterministic order from 

807 # there which, with any luck, is in a new good region. 

808 if not self.step(sp, random_order=failures >= max_failures // 2): 

809 # step returns False when there is nothing to do because 

810 # the entire choice tree is exhausted. If this happens 

811 # we break because we literally can't run this pass any 

812 # more than we already have until something else makes 

813 # progress. 

814 break 

815 any_ran = True 

816 

817 # Don't count steps that didn't actually try to do 

818 # anything as failures. Otherwise, this call is a failure 

819 # if it failed to make any changes to the shrink target. 

820 if initial_calls != self.calls: 

821 if prev is not self.shrink_target: 

822 failures = 0 

823 else: 

824 max_calls_per_failing_step = max( 

825 max_calls_per_failing_step, self.calls - initial_calls 

826 ) 

827 failures += 1 

828 

829 # We reorder the shrink passes so that on our next run through 

830 # we try good ones first. The rule is that shrink passes that 

831 # did nothing useful are the worst, shrink passes that reduced 

832 # the length are the best. 

833 if self.shrink_target is before_sp: 

834 reordering[sp] = 1 

835 elif len(self.choices) < len(before_sp.choices): 

836 reordering[sp] = -1 

837 else: 

838 reordering[sp] = 0 

839 

840 passes.sort(key=reordering.__getitem__) 

841 

842 @property 

843 def nodes(self) -> tuple[ChoiceNode, ...]: 

844 return self.shrink_target.nodes 

845 

846 @property 

847 def choices(self) -> tuple[ChoiceT, ...]: 

848 return self.shrink_target.choices 

849 

850 @property 

851 def spans(self) -> Spans: 

852 return self.shrink_target.spans 

853 

854 @derived_value # type: ignore 

855 def spans_by_label(self): 

856 """ 

857 A mapping of labels to a list of spans with that label. Spans in the list 

858 are ordered by their normal index order. 

859 """ 

860 

861 spans_by_label = defaultdict(list) 

862 for ex in self.spans: 

863 spans_by_label[ex.label].append(ex) 

864 return dict(spans_by_label) 

865 

866 @derived_value # type: ignore 

867 def distinct_labels(self): 

868 return sorted(self.spans_by_label, key=str) 

869 

870 def pass_to_descendant(self, chooser): 

871 """Attempt to replace each span with a descendant span. 

872 

873 This is designed to deal with strategies that call themselves 

874 recursively. For example, suppose we had: 

875 

876 binary_tree = st.deferred( 

877 lambda: st.one_of( 

878 st.integers(), st.tuples(binary_tree, binary_tree))) 

879 

880 This pass guarantees that we can replace any binary tree with one of 

881 its subtrees - each of those will create an interval that the parent 

882 could validly be replaced with, and this pass will try doing that. 

883 

884 This is pretty expensive - it takes O(len(intervals)^2) - so we run it 

885 late in the process when we've got the number of intervals as far down 

886 as possible. 

887 """ 

888 

889 label = chooser.choose( 

890 self.distinct_labels, lambda l: len(self.spans_by_label[l]) >= 2 

891 ) 

892 

893 spans = self.spans_by_label[label] 

894 i = chooser.choose(range(len(spans) - 1)) 

895 ancestor = spans[i] 

896 

897 if i + 1 == len(spans) or spans[i + 1].start >= ancestor.end: 

898 return 

899 

900 @self.cached(label, i) 

901 def descendants(): 

902 lo = i + 1 

903 hi = len(spans) 

904 while lo + 1 < hi: 

905 mid = (lo + hi) // 2 

906 if spans[mid].start >= ancestor.end: 

907 hi = mid 

908 else: 

909 lo = mid 

910 return [ 

911 span 

912 for span in spans[i + 1 : hi] 

913 if span.choice_count < ancestor.choice_count 

914 ] 

915 

916 descendant = chooser.choose(descendants, lambda ex: ex.choice_count > 0) 

917 

918 assert ancestor.start <= descendant.start 

919 assert ancestor.end >= descendant.end 

920 assert descendant.choice_count < ancestor.choice_count 

921 

922 self.consider_new_nodes( 

923 self.nodes[: ancestor.start] 

924 + self.nodes[descendant.start : descendant.end] 

925 + self.nodes[ancestor.end :] 

926 ) 

927 

928 def lower_common_node_offset(self): 

929 """Sometimes we find ourselves in a situation where changes to one part 

930 of the choice sequence unlock changes to other parts. Sometimes this is 

931 good, but sometimes this can cause us to exhibit exponential slow 

932 downs! 

933 

934 e.g. suppose we had the following: 

935 

936 m = draw(integers(min_value=0)) 

937 n = draw(integers(min_value=0)) 

938 assert abs(m - n) > 1 

939 

940 If this fails then we'll end up with a loop where on each iteration we 

941 reduce each of m and n by 2 - m can't go lower because of n, then n 

942 can't go lower because of m. 

943 

944 This will take us O(m) iterations to complete, which is exponential in 

945 the data size, as we gradually zig zag our way towards zero. 

946 

947 This can only happen if we're failing to reduce the size of the choice 

948 sequence: The number of iterations that reduce the length of the choice 

949 sequence is bounded by that length. 

950 

951 So what we do is this: We keep track of which nodes are changing, and 

952 then if there's some non-zero common offset to them we try and minimize 

953 them all at once by lowering that offset. 

954 

955 This may not work, and it definitely won't get us out of all possible 

956 exponential slow downs (an example of where it doesn't is where the 

957 shape of the nodes changes as a result of this bouncing behaviour), 

958 but it fails fast when it doesn't work and gets us out of a really 

959 nastily slow case when it does. 

960 """ 

961 if len(self.__changed_nodes) <= 1: 

962 return 

963 

964 changed = [] 

965 for i in sorted(self.__changed_nodes): 

966 node = self.nodes[i] 

967 if node.trivial or node.type != "integer": 

968 continue 

969 changed.append(node) 

970 

971 if not changed: 

972 return 

973 

974 ints = [ 

975 abs(node.value - node.constraints["shrink_towards"]) for node in changed 

976 ] 

977 offset = min(ints) 

978 assert offset > 0 

979 

980 for i in range(len(ints)): 

981 ints[i] -= offset 

982 

983 st = self.shrink_target 

984 

985 def offset_node(node, n): 

986 return ( 

987 node.index, 

988 node.index + 1, 

989 [node.copy(with_value=node.constraints["shrink_towards"] + n)], 

990 ) 

991 

992 def consider(n, sign): 

993 return self.consider_new_nodes( 

994 replace_all( 

995 st.nodes, 

996 [ 

997 offset_node(node, sign * (n + v)) 

998 for node, v in zip(changed, ints) 

999 ], 

1000 ) 

1001 ) 

1002 

1003 # shrink from both sides 

1004 Integer.shrink(offset, lambda n: consider(n, 1)) 

1005 Integer.shrink(offset, lambda n: consider(n, -1)) 

1006 self.clear_change_tracking() 

1007 

1008 def clear_change_tracking(self): 

1009 self.__last_checked_changed_at = self.shrink_target 

1010 self.__all_changed_nodes = set() 

1011 

1012 def mark_changed(self, i): 

1013 self.__changed_nodes.add(i) 

1014 

1015 @property 

1016 def __changed_nodes(self) -> set[int]: 

1017 if self.__last_checked_changed_at is self.shrink_target: 

1018 return self.__all_changed_nodes 

1019 

1020 prev_target = self.__last_checked_changed_at 

1021 new_target = self.shrink_target 

1022 assert prev_target is not new_target 

1023 prev_nodes = prev_target.nodes 

1024 new_nodes = new_target.nodes 

1025 assert sort_key(new_target.nodes) < sort_key(prev_target.nodes) 

1026 

1027 if len(prev_nodes) != len(new_nodes) or any( 

1028 n1.type != n2.type for n1, n2 in zip(prev_nodes, new_nodes) 

1029 ): 

1030 # should we check constraints are equal as well? 

1031 self.__all_changed_nodes = set() 

1032 else: 

1033 assert len(prev_nodes) == len(new_nodes) 

1034 for i, (n1, n2) in enumerate(zip(prev_nodes, new_nodes)): 

1035 assert n1.type == n2.type 

1036 if not choice_equal(n1.value, n2.value): 

1037 self.__all_changed_nodes.add(i) 

1038 

1039 return self.__all_changed_nodes 

1040 

1041 def update_shrink_target(self, new_target): 

1042 assert isinstance(new_target, ConjectureResult) 

1043 self.shrinks += 1 

1044 # If we are just taking a long time to shrink we don't want to 

1045 # trigger this heuristic, so whenever we shrink successfully 

1046 # we give ourselves a bit of breathing room to make sure we 

1047 # would find a shrink that took that long to find the next time. 

1048 # The case where we're taking a long time but making steady 

1049 # progress is handled by `finish_shrinking_deadline` in engine.py 

1050 self.max_stall = max( 

1051 self.max_stall, (self.calls - self.calls_at_last_shrink) * 2 

1052 ) 

1053 self.calls_at_last_shrink = self.calls 

1054 self.shrink_target = new_target 

1055 self.__derived_values = {} 

1056 

1057 def try_shrinking_nodes(self, nodes, n): 

1058 """Attempts to replace each node in the nodes list with n. Returns 

1059 True if it succeeded (which may include some additional modifications 

1060 to shrink_target). 

1061 

1062 In current usage it is expected that each of the nodes currently have 

1063 the same value and choice_type, although this is not essential. Note that 

1064 n must be < the node at min(nodes) or this is not a valid shrink. 

1065 

1066 This method will attempt to do some small amount of work to delete data 

1067 that occurs after the end of the nodes. This is useful for cases where 

1068 there is some size dependency on the value of a node. 

1069 """ 

1070 # If the length of the shrink target has changed from under us such that 

1071 # the indices are out of bounds, give up on the replacement. 

1072 # TODO_BETTER_SHRINK: we probably want to narrow down the root cause here at some point. 

1073 if any(node.index >= len(self.nodes) for node in nodes): 

1074 return # pragma: no cover 

1075 

1076 initial_attempt = replace_all( 

1077 self.nodes, 

1078 [(node.index, node.index + 1, [node.copy(with_value=n)]) for node in nodes], 

1079 ) 

1080 

1081 attempt = self.cached_test_function(initial_attempt)[1] 

1082 

1083 if attempt is None: 

1084 return False 

1085 

1086 if attempt is self.shrink_target: 

1087 # if the initial shrink was a success, try lowering offsets. 

1088 self.lower_common_node_offset() 

1089 return True 

1090 

1091 # If this produced something completely invalid we ditch it 

1092 # here rather than trying to persevere. 

1093 if attempt.status is Status.OVERRUN: 

1094 return False 

1095 

1096 if attempt.status is Status.INVALID: 

1097 return False 

1098 

1099 if attempt.misaligned_at is not None: 

1100 # we're invalid due to a misalignment in the tree. We'll try to fix 

1101 # a very specific type of misalignment here: where we have a node of 

1102 # {"size": n} and tried to draw the same node, but with {"size": m < n}. 

1103 # This can occur with eg 

1104 # 

1105 # n = data.draw_integer() 

1106 # s = data.draw_string(min_size=n) 

1107 # 

1108 # where we try lowering n, resulting in the test_function drawing a lower 

1109 # min_size than our attempt had for the draw_string node. 

1110 # 

1111 # We'll now try realigning this tree by: 

1112 # * replacing the constraints in our attempt with what test_function tried 

1113 # to draw in practice 

1114 # * truncating the value of that node to match min_size 

1115 # 

1116 # This helps in the specific case of drawing a value and then drawing 

1117 # a collection of that size...and not much else. In practice this 

1118 # helps because this antipattern is fairly common. 

1119 

1120 # TODO we'll probably want to apply the same trick as in the valid 

1121 # case of this function of preserving from the right instead of 

1122 # preserving from the left. see test_can_shrink_variable_string_draws. 

1123 

1124 (index, attempt_choice_type, attempt_constraints, _attempt_forced) = ( 

1125 attempt.misaligned_at 

1126 ) 

1127 node = self.nodes[index] 

1128 if node.type != attempt_choice_type: 

1129 return False # pragma: no cover 

1130 if node.was_forced: 

1131 return False # pragma: no cover 

1132 

1133 if node.type in {"string", "bytes"}: 

1134 # if the size *increased*, we would have to guess what to pad with 

1135 # in order to try fixing up this attempt. Just give up. 

1136 if node.constraints["min_size"] <= attempt_constraints["min_size"]: 

1137 # attempts which increase min_size tend to overrun rather than 

1138 # be misaligned, making a covering case difficult. 

1139 return False # pragma: no cover 

1140 # the size decreased in our attempt. Try again, but truncate the value 

1141 # to that size by removing any elements past min_size. 

1142 return self.consider_new_nodes( 

1143 initial_attempt[: node.index] 

1144 + [ 

1145 initial_attempt[node.index].copy( 

1146 with_constraints=attempt_constraints, 

1147 with_value=initial_attempt[node.index].value[ 

1148 : attempt_constraints["min_size"] 

1149 ], 

1150 ) 

1151 ] 

1152 + initial_attempt[node.index :] 

1153 ) 

1154 

1155 lost_nodes = len(self.nodes) - len(attempt.nodes) 

1156 if lost_nodes <= 0: 

1157 return False 

1158 

1159 start = nodes[0].index 

1160 end = nodes[-1].index + 1 

1161 # We now look for contiguous regions to delete that might help fix up 

1162 # this failed shrink. We only look for contiguous regions of the right 

1163 # lengths because doing anything more than that starts to get very 

1164 # expensive. See minimize_individual_choices for where we 

1165 # try to be more aggressive. 

1166 regions_to_delete = {(end, end + lost_nodes)} 

1167 

1168 for ex in self.spans: 

1169 if ex.start > start: 

1170 continue 

1171 if ex.end <= end: 

1172 continue 

1173 

1174 if ex.index >= len(attempt.spans): 

1175 continue # pragma: no cover 

1176 

1177 replacement = attempt.spans[ex.index] 

1178 in_original = [c for c in ex.children if c.start >= end] 

1179 in_replaced = [c for c in replacement.children if c.start >= end] 

1180 

1181 if len(in_replaced) >= len(in_original) or not in_replaced: 

1182 continue 

1183 

1184 # We've found a span where some of the children went missing 

1185 # as a result of this change, and just replacing it with the data 

1186 # it would have had and removing the spillover didn't work. This 

1187 # means that some of its children towards the right must be 

1188 # important, so we try to arrange it so that it retains its 

1189 # rightmost children instead of its leftmost. 

1190 regions_to_delete.add( 

1191 (in_original[0].start, in_original[-len(in_replaced)].start) 

1192 ) 

1193 

1194 for u, v in sorted(regions_to_delete, key=lambda x: x[1] - x[0], reverse=True): 

1195 try_with_deleted = initial_attempt[:u] + initial_attempt[v:] 

1196 if self.consider_new_nodes(try_with_deleted): 

1197 return True 

1198 

1199 return False 

1200 

1201 def remove_discarded(self): 

1202 """Try removing all bytes marked as discarded. 

1203 

1204 This is primarily to deal with data that has been ignored while 

1205 doing rejection sampling - e.g. as a result of an integer range, or a 

1206 filtered strategy. 

1207 

1208 Such data will also be handled by the adaptive_example_deletion pass, 

1209 but that pass is necessarily more conservative and will try deleting 

1210 each interval individually. The common case is that all data drawn and 

1211 rejected can just be thrown away immediately in one block, so this pass 

1212 will be much faster than trying each one individually when it works. 

1213 

1214 returns False if there is discarded data and removing it does not work, 

1215 otherwise returns True. 

1216 """ 

1217 while self.shrink_target.has_discards: 

1218 discarded = [] 

1219 

1220 for ex in self.shrink_target.spans: 

1221 if ( 

1222 ex.choice_count > 0 

1223 and ex.discarded 

1224 and (not discarded or ex.start >= discarded[-1][-1]) 

1225 ): 

1226 discarded.append((ex.start, ex.end)) 

1227 

1228 # This can happen if we have discards but they are all of 

1229 # zero length. This shouldn't happen very often so it's 

1230 # faster to check for it here than at the point of example 

1231 # generation. 

1232 if not discarded: 

1233 break 

1234 

1235 attempt = list(self.nodes) 

1236 for u, v in reversed(discarded): 

1237 del attempt[u:v] 

1238 

1239 if not self.consider_new_nodes(tuple(attempt)): 

1240 return False 

1241 return True 

1242 

1243 @derived_value # type: ignore 

1244 def duplicated_nodes(self): 

1245 """Returns a list of nodes grouped (choice_type, value).""" 

1246 duplicates = defaultdict(list) 

1247 for node in self.nodes: 

1248 duplicates[(node.type, choice_key(node.value))].append(node) 

1249 return list(duplicates.values()) 

1250 

1251 def node_program(self, program: str) -> ShrinkPass: 

1252 return ShrinkPass( 

1253 lambda chooser: self._node_program(chooser, program), 

1254 name=f"node_program_{program}", 

1255 ) 

1256 

1257 def _node_program(self, chooser, program): 

1258 n = len(program) 

1259 # Adaptively attempt to run the node program at the current 

1260 # index. If this successfully applies the node program ``k`` times 

1261 # then this runs in ``O(log(k))`` test function calls. 

1262 i = chooser.choose(range(len(self.nodes) - n + 1)) 

1263 

1264 # First, run the node program at the chosen index. If this fails, 

1265 # don't do any extra work, so that failure is as cheap as possible. 

1266 if not self.run_node_program(i, program, original=self.shrink_target): 

1267 return 

1268 

1269 # Because we run in a random order we will often find ourselves in the middle 

1270 # of a region where we could run the node program. We thus start by moving 

1271 # left to the beginning of that region if possible in order to to start from 

1272 # the beginning of that region. 

1273 def offset_left(k): 

1274 return i - k * n 

1275 

1276 i = offset_left( 

1277 find_integer( 

1278 lambda k: self.run_node_program( 

1279 offset_left(k), program, original=self.shrink_target 

1280 ) 

1281 ) 

1282 ) 

1283 

1284 original = self.shrink_target 

1285 # Now try to run the node program multiple times here. 

1286 find_integer( 

1287 lambda k: self.run_node_program(i, program, original=original, repeats=k) 

1288 ) 

1289 

1290 def minimize_duplicated_choices(self, chooser): 

1291 """Find choices that have been duplicated in multiple places and attempt 

1292 to minimize all of the duplicates simultaneously. 

1293 

1294 This lets us handle cases where two values can't be shrunk 

1295 independently of each other but can easily be shrunk together. 

1296 For example if we had something like: 

1297 

1298 ls = data.draw(lists(integers())) 

1299 y = data.draw(integers()) 

1300 assert y not in ls 

1301 

1302 Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were 

1303 to replace both 3s with 0, this would be a valid shrink, but if we were 

1304 to replace either 3 with 0 on its own the test would start passing. 

1305 

1306 It is also useful for when that duplication is accidental and the value 

1307 of the choices don't matter very much because it allows us to replace 

1308 more values at once. 

1309 """ 

1310 nodes = chooser.choose(self.duplicated_nodes) 

1311 # we can't lower any nodes which are trivial. try proceeding with the 

1312 # remaining nodes. 

1313 nodes = [node for node in nodes if not node.trivial] 

1314 if len(nodes) <= 1: 

1315 return 

1316 

1317 self.minimize_nodes(nodes) 

1318 

1319 def redistribute_numeric_pairs(self, chooser): 

1320 """If there is a sum of generated numbers that we need their sum 

1321 to exceed some bound, lowering one of them requires raising the 

1322 other. This pass enables that.""" 

1323 

1324 # look for a pair of nodes (node1, node2) which are both numeric 

1325 # and aren't separated by too many other nodes. We'll decrease node1 and 

1326 # increase node2 (note that the other way around doesn't make sense as 

1327 # it's strictly worse in the ordering). 

1328 def can_choose_node(node): 

1329 # don't choose nan, inf, or floats above the threshold where f + 1 > f 

1330 # (which is not necessarily true for floats above MAX_PRECISE_INTEGER). 

1331 # The motivation for the last condition is to avoid trying weird 

1332 # non-shrinks where we raise one node and think we lowered another 

1333 # (but didn't). 

1334 return node.type in {"integer", "float"} and not ( 

1335 node.type == "float" 

1336 and (math.isnan(node.value) or abs(node.value) >= MAX_PRECISE_INTEGER) 

1337 ) 

1338 

1339 node1 = chooser.choose( 

1340 self.nodes, 

1341 lambda node: can_choose_node(node) and not node.trivial, 

1342 ) 

1343 node2 = chooser.choose( 

1344 self.nodes, 

1345 lambda node: can_choose_node(node) 

1346 # Note that it's fine for node2 to be trivial, because we're going to 

1347 # explicitly make it *not* trivial by adding to its value. 

1348 and not node.was_forced 

1349 # to avoid quadratic behavior, scan ahead only a small amount for 

1350 # the related node. 

1351 and node1.index < node.index <= node1.index + 4, 

1352 ) 

1353 

1354 m: Union[int, float] = node1.value 

1355 n: Union[int, float] = node2.value 

1356 

1357 def boost(k: int) -> bool: 

1358 # floats always shrink towards 0 

1359 shrink_towards = ( 

1360 node1.constraints["shrink_towards"] if node1.type == "integer" else 0 

1361 ) 

1362 if k > abs(m - shrink_towards): 

1363 return False 

1364 

1365 # We are trying to move node1 (m) closer to shrink_towards, and node2 

1366 # (n) farther away from shrink_towards. If m is below shrink_towards, 

1367 # we want to add to m and subtract from n, and vice versa if above 

1368 # shrink_towards. 

1369 if m < shrink_towards: 

1370 k = -k 

1371 

1372 try: 

1373 v1 = m - k 

1374 v2 = n + k 

1375 except OverflowError: # pragma: no cover 

1376 # if n or m is a float and k is over sys.float_info.max, coercing 

1377 # k to a float will overflow. 

1378 return False 

1379 

1380 # if we've increased node2 to the point that we're past max precision, 

1381 # give up - things have become too unstable. 

1382 if node1.type == "float" and abs(v2) >= MAX_PRECISE_INTEGER: 

1383 return False 

1384 

1385 return self.consider_new_nodes( 

1386 self.nodes[: node1.index] 

1387 + (node1.copy(with_value=v1),) 

1388 + self.nodes[node1.index + 1 : node2.index] 

1389 + (node2.copy(with_value=v2),) 

1390 + self.nodes[node2.index + 1 :] 

1391 ) 

1392 

1393 find_integer(boost) 

1394 

1395 def lower_integers_together(self, chooser): 

1396 node1 = chooser.choose( 

1397 self.nodes, lambda n: n.type == "integer" and not n.trivial 

1398 ) 

1399 # Search up to 3 nodes ahead, to avoid quadratic time. 

1400 node2 = self.nodes[ 

1401 chooser.choose( 

1402 range(node1.index + 1, min(len(self.nodes), node1.index + 3 + 1)), 

1403 lambda i: self.nodes[i].type == "integer" 

1404 and not self.nodes[i].was_forced, 

1405 ) 

1406 ] 

1407 

1408 # one might expect us to require node2 to be nontrivial, and to minimize 

1409 # the node which is closer to its shrink_towards, rather than node1 

1410 # unconditionally. In reality, it's acceptable for us to transition node2 

1411 # from trivial to nontrivial, because the shrink ordering is dominated by 

1412 # the complexity of the earlier node1. What matters is minimizing node1. 

1413 shrink_towards = node1.constraints["shrink_towards"] 

1414 

1415 def consider(n): 

1416 return self.consider_new_nodes( 

1417 self.nodes[: node1.index] 

1418 + (node1.copy(with_value=node1.value - n),) 

1419 + self.nodes[node1.index + 1 : node2.index] 

1420 + (node2.copy(with_value=node2.value - n),) 

1421 + self.nodes[node2.index + 1 :] 

1422 ) 

1423 

1424 find_integer(lambda n: consider(shrink_towards - n)) 

1425 find_integer(lambda n: consider(n - shrink_towards)) 

1426 

1427 def lower_duplicated_characters(self, chooser): 

1428 """ 

1429 Select two string choices no more than 4 choices apart and simultaneously 

1430 lower characters which appear in both strings. This helps cases where the 

1431 same character must appear in two strings, but the actual value of the 

1432 character is not relevant. 

1433 

1434 This shrinking pass currently only tries lowering *all* instances of the 

1435 duplicated character in both strings. So for instance, given two choices: 

1436 

1437 "bbac" 

1438 "abbb" 

1439 

1440 we would try lowering all five of the b characters simultaneously. This 

1441 may fail to shrink some cases where only certain character indices are 

1442 correlated, for instance if only the b at index 1 could be lowered 

1443 simultaneously and the rest did in fact actually have to be a `b`. 

1444 

1445 It would be nice to try shrinking that case as well, but we would need good 

1446 safeguards because it could get very expensive to try all combinations. 

1447 I expect lowering all duplicates to handle most cases in the meantime. 

1448 """ 

1449 node1 = chooser.choose( 

1450 self.nodes, lambda n: n.type == "string" and not n.trivial 

1451 ) 

1452 

1453 # limit search to up to 4 choices ahead, to avoid quadratic behavior 

1454 node2 = self.nodes[ 

1455 chooser.choose( 

1456 range(node1.index + 1, min(len(self.nodes), node1.index + 1 + 4)), 

1457 lambda i: self.nodes[i].type == "string" and not self.nodes[i].trivial 

1458 # select nodes which have at least one of the same character present 

1459 and set(node1.value) & set(self.nodes[i].value), 

1460 ) 

1461 ] 

1462 

1463 duplicated_characters = set(node1.value) & set(node2.value) 

1464 # deterministic ordering 

1465 char = chooser.choose(sorted(duplicated_characters)) 

1466 intervals = node1.constraints["intervals"] 

1467 

1468 def copy_node(node, n): 

1469 # replace all duplicate characters in each string. This might miss 

1470 # some shrinks compared to only replacing some, but trying all possible 

1471 # combinations of indices could get expensive if done without some 

1472 # thought. 

1473 return node.copy( 

1474 with_value=node.value.replace(char, intervals.char_in_shrink_order(n)) 

1475 ) 

1476 

1477 Integer.shrink( 

1478 intervals.index_from_char_in_shrink_order(char), 

1479 lambda n: self.consider_new_nodes( 

1480 self.nodes[: node1.index] 

1481 + (copy_node(node1, n),) 

1482 + self.nodes[node1.index + 1 : node2.index] 

1483 + (copy_node(node2, n),) 

1484 + self.nodes[node2.index + 1 :] 

1485 ), 

1486 ) 

1487 

1488 def minimize_nodes(self, nodes): 

1489 choice_type = nodes[0].type 

1490 value = nodes[0].value 

1491 # unlike choice_type and value, constraints are *not* guaranteed to be equal among all 

1492 # passed nodes. We arbitrarily use the constraints of the first node. I think 

1493 # this is unsound (= leads to us trying shrinks that could not have been 

1494 # generated), but those get discarded at test-time, and this enables useful 

1495 # slips where constraints are not equal but are close enough that doing the 

1496 # same operation on both basically just works. 

1497 constraints = nodes[0].constraints 

1498 assert all( 

1499 node.type == choice_type and choice_equal(node.value, value) 

1500 for node in nodes 

1501 ) 

1502 

1503 if choice_type == "integer": 

1504 shrink_towards = constraints["shrink_towards"] 

1505 # try shrinking from both sides towards shrink_towards. 

1506 # we're starting from n = abs(shrink_towards - value). Because the 

1507 # shrinker will not check its starting value, we need to try 

1508 # shrinking to n first. 

1509 self.try_shrinking_nodes(nodes, abs(shrink_towards - value)) 

1510 Integer.shrink( 

1511 abs(shrink_towards - value), 

1512 lambda n: self.try_shrinking_nodes(nodes, shrink_towards + n), 

1513 ) 

1514 Integer.shrink( 

1515 abs(shrink_towards - value), 

1516 lambda n: self.try_shrinking_nodes(nodes, shrink_towards - n), 

1517 ) 

1518 elif choice_type == "float": 

1519 self.try_shrinking_nodes(nodes, abs(value)) 

1520 Float.shrink( 

1521 abs(value), 

1522 lambda val: self.try_shrinking_nodes(nodes, val), 

1523 ) 

1524 Float.shrink( 

1525 abs(value), 

1526 lambda val: self.try_shrinking_nodes(nodes, -val), 

1527 ) 

1528 elif choice_type == "boolean": 

1529 # must be True, otherwise would be trivial and not selected. 

1530 assert value is True 

1531 # only one thing to try: false! 

1532 self.try_shrinking_nodes(nodes, False) 

1533 elif choice_type == "bytes": 

1534 Bytes.shrink( 

1535 value, 

1536 lambda val: self.try_shrinking_nodes(nodes, val), 

1537 min_size=constraints["min_size"], 

1538 ) 

1539 elif choice_type == "string": 

1540 String.shrink( 

1541 value, 

1542 lambda val: self.try_shrinking_nodes(nodes, val), 

1543 intervals=constraints["intervals"], 

1544 min_size=constraints["min_size"], 

1545 ) 

1546 else: 

1547 raise NotImplementedError 

1548 

1549 def try_trivial_spans(self, chooser): 

1550 i = chooser.choose(range(len(self.spans))) 

1551 

1552 prev = self.shrink_target 

1553 nodes = self.shrink_target.nodes 

1554 ex = self.spans[i] 

1555 prefix = nodes[: ex.start] 

1556 replacement = tuple( 

1557 [ 

1558 ( 

1559 node 

1560 if node.was_forced 

1561 else node.copy( 

1562 with_value=choice_from_index(0, node.type, node.constraints) 

1563 ) 

1564 ) 

1565 for node in nodes[ex.start : ex.end] 

1566 ] 

1567 ) 

1568 suffix = nodes[ex.end :] 

1569 attempt = self.cached_test_function(prefix + replacement + suffix)[1] 

1570 

1571 if self.shrink_target is not prev: 

1572 return 

1573 

1574 if isinstance(attempt, ConjectureResult): 

1575 new_ex = attempt.spans[i] 

1576 new_replacement = attempt.nodes[new_ex.start : new_ex.end] 

1577 self.consider_new_nodes(prefix + new_replacement + suffix) 

1578 

1579 def minimize_individual_choices(self, chooser): 

1580 """Attempt to minimize each choice in sequence. 

1581 

1582 This is the pass that ensures that e.g. each integer we draw is a 

1583 minimum value. So it's the part that guarantees that if we e.g. do 

1584 

1585 x = data.draw(integers()) 

1586 assert x < 10 

1587 

1588 then in our shrunk example, x = 10 rather than say 97. 

1589 

1590 If we are unsuccessful at minimizing a choice of interest we then 

1591 check if that's because it's changing the size of the test case and, 

1592 if so, we also make an attempt to delete parts of the test case to 

1593 see if that fixes it. 

1594 

1595 We handle most of the common cases in try_shrinking_nodes which is 

1596 pretty good at clearing out large contiguous blocks of dead space, 

1597 but it fails when there is data that has to stay in particular places 

1598 in the list. 

1599 """ 

1600 node = chooser.choose(self.nodes, lambda node: not node.trivial) 

1601 initial_target = self.shrink_target 

1602 

1603 self.minimize_nodes([node]) 

1604 if self.shrink_target is not initial_target: 

1605 # the shrink target changed, so our shrink worked. Defer doing 

1606 # anything more intelligent until this shrink fails. 

1607 return 

1608 

1609 # the shrink failed. One particularly common case where minimizing a 

1610 # node can fail is the antipattern of drawing a size and then drawing a 

1611 # collection of that size, or more generally when there is a size 

1612 # dependency on some single node. We'll explicitly try and fix up this 

1613 # common case here: if decreasing an integer node by one would reduce 

1614 # the size of the generated input, we'll try deleting things after that 

1615 # node and see if the resulting attempt works. 

1616 

1617 if node.type != "integer": 

1618 # Only try this fixup logic on integer draws. Almost all size 

1619 # dependencies are on integer draws, and if it's not, it's doing 

1620 # something convoluted enough that it is unlikely to shrink well anyway. 

1621 # TODO: extent to floats? we probably currently fail on the following, 

1622 # albeit convoluted example: 

1623 # n = int(data.draw(st.floats())) 

1624 # s = data.draw(st.lists(st.integers(), min_size=n, max_size=n)) 

1625 return 

1626 

1627 lowered = ( 

1628 self.nodes[: node.index] 

1629 + (node.copy(with_value=node.value - 1),) 

1630 + self.nodes[node.index + 1 :] 

1631 ) 

1632 attempt = self.cached_test_function(lowered)[1] 

1633 if ( 

1634 attempt is None 

1635 or attempt.status < Status.VALID 

1636 or len(attempt.nodes) == len(self.nodes) 

1637 or len(attempt.nodes) == node.index + 1 

1638 ): 

1639 # no point in trying our size-dependency-logic if our attempt at 

1640 # lowering the node resulted in: 

1641 # * an invalid conjecture data 

1642 # * the same number of nodes as before 

1643 # * no nodes beyond the lowered node (nothing to try to delete afterwards) 

1644 return 

1645 

1646 # If it were then the original shrink should have worked and we could 

1647 # never have got here. 

1648 assert attempt is not self.shrink_target 

1649 

1650 @self.cached(node.index) 

1651 def first_span_after_node(): 

1652 lo = 0 

1653 hi = len(self.spans) 

1654 while lo + 1 < hi: 

1655 mid = (lo + hi) // 2 

1656 ex = self.spans[mid] 

1657 if ex.start >= node.index: 

1658 hi = mid 

1659 else: 

1660 lo = mid 

1661 return hi 

1662 

1663 # we try deleting both entire spans, and single nodes. 

1664 # If we wanted to get more aggressive, we could try deleting n 

1665 # consecutive nodes (that don't cross a span boundary) for say 

1666 # n <= 2 or n <= 3. 

1667 if chooser.choose([True, False]): 

1668 ex = self.spans[ 

1669 chooser.choose( 

1670 range(first_span_after_node, len(self.spans)), 

1671 lambda i: self.spans[i].choice_count > 0, 

1672 ) 

1673 ] 

1674 self.consider_new_nodes(lowered[: ex.start] + lowered[ex.end :]) 

1675 else: 

1676 node = self.nodes[chooser.choose(range(node.index + 1, len(self.nodes)))] 

1677 self.consider_new_nodes(lowered[: node.index] + lowered[node.index + 1 :]) 

1678 

1679 def reorder_spans(self, chooser): 

1680 """This pass allows us to reorder the children of each span. 

1681 

1682 For example, consider the following: 

1683 

1684 .. code-block:: python 

1685 

1686 import hypothesis.strategies as st 

1687 from hypothesis import given 

1688 

1689 

1690 @given(st.text(), st.text()) 

1691 def test_not_equal(x, y): 

1692 assert x != y 

1693 

1694 Without the ability to reorder x and y this could fail either with 

1695 ``x=""``, ``y="0"``, or the other way around. With reordering it will 

1696 reliably fail with ``x=""``, ``y="0"``. 

1697 """ 

1698 ex = chooser.choose(self.spans) 

1699 label = chooser.choose(ex.children).label 

1700 

1701 spans = [c for c in ex.children if c.label == label] 

1702 if len(spans) <= 1: 

1703 return 

1704 st = self.shrink_target 

1705 endpoints = [(ex.start, ex.end) for ex in spans] 

1706 

1707 Ordering.shrink( 

1708 range(len(spans)), 

1709 lambda indices: self.consider_new_nodes( 

1710 replace_all( 

1711 st.nodes, 

1712 [ 

1713 ( 

1714 u, 

1715 v, 

1716 st.nodes[spans[i].start : spans[i].end], 

1717 ) 

1718 for (u, v), i in zip(endpoints, indices) 

1719 ], 

1720 ) 

1721 ), 

1722 key=lambda i: sort_key(st.nodes[spans[i].start : spans[i].end]), 

1723 ) 

1724 

1725 def run_node_program(self, i, program, original, repeats=1): 

1726 """Node programs are a mini-DSL for node rewriting, defined as a sequence 

1727 of commands that can be run at some index into the nodes 

1728 

1729 Commands are: 

1730 

1731 * "X", delete this node 

1732 

1733 This method runs the node program in ``program`` at node index 

1734 ``i`` on the ConjectureData ``original``. If ``repeats > 1`` then it 

1735 will attempt to approximate the results of running it that many times. 

1736 

1737 Returns True if this successfully changes the underlying shrink target, 

1738 else False. 

1739 """ 

1740 if i + len(program) > len(original.nodes) or i < 0: 

1741 return False 

1742 attempt = list(original.nodes) 

1743 for _ in range(repeats): 

1744 for k, command in reversed(list(enumerate(program))): 

1745 j = i + k 

1746 if j >= len(attempt): 

1747 return False 

1748 

1749 if command == "X": 

1750 del attempt[j] 

1751 else: 

1752 raise NotImplementedError(f"Unrecognised command {command!r}") 

1753 

1754 return self.consider_new_nodes(attempt)