Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/shrinker.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

659 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import math 

12from collections import defaultdict 

13from collections.abc import Sequence 

14from dataclasses import dataclass 

15from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Union, cast 

16 

17import attr 

18 

19from hypothesis.internal.conjecture.choice import ( 

20 ChoiceNode, 

21 ChoiceT, 

22 choice_equal, 

23 choice_from_index, 

24 choice_key, 

25 choice_permitted, 

26 choice_to_index, 

27) 

28from hypothesis.internal.conjecture.data import ( 

29 ConjectureData, 

30 ConjectureResult, 

31 Spans, 

32 Status, 

33 _Overrun, 

34 draw_choice, 

35) 

36from hypothesis.internal.conjecture.junkdrawer import ( 

37 endswith, 

38 find_integer, 

39 replace_all, 

40 startswith, 

41) 

42from hypothesis.internal.conjecture.shrinking import ( 

43 Bytes, 

44 Float, 

45 Integer, 

46 Ordering, 

47 String, 

48) 

49from hypothesis.internal.conjecture.shrinking.choicetree import ( 

50 ChoiceTree, 

51 prefix_selection_order, 

52 random_selection_order, 

53) 

54from hypothesis.internal.floats import MAX_PRECISE_INTEGER 

55 

56if TYPE_CHECKING: 

57 from random import Random 

58 from typing import TypeAlias 

59 

60 from hypothesis.internal.conjecture.engine import ConjectureRunner 

61 

62ShrinkPredicateT: "TypeAlias" = Callable[[Union[ConjectureResult, _Overrun]], bool] 

63 

64 

65def sort_key(nodes: Sequence[ChoiceNode]) -> tuple[int, tuple[int, ...]]: 

66 """Returns a sort key such that "simpler" choice sequences are smaller than 

67 "more complicated" ones. 

68 

69 We define sort_key so that x is simpler than y if x is shorter than y or if 

70 they have the same length and map(choice_to_index, x) < map(choice_to_index, y). 

71 

72 The reason for using this ordering is: 

73 

74 1. If x is shorter than y then that means we had to make fewer decisions 

75 in constructing the test case when we ran x than we did when we ran y. 

76 2. If x is the same length as y then replacing a choice with a lower index 

77 choice corresponds to replacing it with a simpler/smaller choice. 

78 3. Because choices drawn early in generation potentially get used in more 

79 places they potentially have a more significant impact on the final 

80 result, so it makes sense to prioritise reducing earlier choices over 

81 later ones. 

82 """ 

83 return ( 

84 len(nodes), 

85 tuple(choice_to_index(node.value, node.constraints) for node in nodes), 

86 ) 

87 

88 

89SHRINK_PASS_DEFINITIONS: dict[str, "ShrinkPassDefinition"] = {} 

90 

91 

92@dataclass 

93class ShrinkPassDefinition: 

94 """A shrink pass bundles together a large number of local changes to 

95 the current shrink target. 

96 

97 Each shrink pass is defined by some function and some arguments to that 

98 function. The ``generate_arguments`` function returns all arguments that 

99 might be useful to run on the current shrink target. 

100 

101 The guarantee made by methods defined this way is that after they are 

102 called then *either* the shrink target has changed *or* each of 

103 ``fn(*args)`` has been called for every ``args`` in ``generate_arguments(self)``. 

104 No guarantee is made that all of these will be called if the shrink target 

105 changes. 

106 """ 

107 

108 run_with_chooser: Any 

109 

110 @property 

111 def name(self) -> str: 

112 return self.run_with_chooser.__name__ 

113 

114 def __post_init__(self) -> None: 

115 assert self.name not in SHRINK_PASS_DEFINITIONS, self.name 

116 SHRINK_PASS_DEFINITIONS[self.name] = self 

117 

118 

119def defines_shrink_pass(): 

120 """A convenient decorator for defining shrink passes.""" 

121 

122 def accept(run_step): 

123 ShrinkPassDefinition(run_with_chooser=run_step) 

124 

125 def run(self): 

126 raise NotImplementedError("Shrink passes should not be run directly") 

127 

128 run.__name__ = run_step.__name__ 

129 run.is_shrink_pass = True 

130 return run 

131 

132 return accept 

133 

134 

135class Shrinker: 

136 """A shrinker is a child object of a ConjectureRunner which is designed to 

137 manage the associated state of a particular shrink problem. That is, we 

138 have some initial ConjectureData object and some property of interest 

139 that it satisfies, and we want to find a ConjectureData object with a 

140 shortlex (see sort_key above) smaller choice sequence that exhibits the same 

141 property. 

142 

143 Currently the only property of interest we use is that the status is 

144 INTERESTING and the interesting_origin takes on some fixed value, but we 

145 may potentially be interested in other use cases later. 

146 However we assume that data with a status < VALID never satisfies the predicate. 

147 

148 The shrinker keeps track of a value shrink_target which represents the 

149 current best known ConjectureData object satisfying the predicate. 

150 It refines this value by repeatedly running *shrink passes*, which are 

151 methods that perform a series of transformations to the current shrink_target 

152 and evaluate the underlying test function to find new ConjectureData 

153 objects. If any of these satisfy the predicate, the shrink_target 

154 is updated automatically. Shrinking runs until no shrink pass can 

155 improve the shrink_target, at which point it stops. It may also be 

156 terminated if the underlying engine throws RunIsComplete, but that 

157 is handled by the calling code rather than the Shrinker. 

158 

159 ======================= 

160 Designing Shrink Passes 

161 ======================= 

162 

163 Generally a shrink pass is just any function that calls 

164 cached_test_function and/or consider_new_nodes a number of times, 

165 but there are a couple of useful things to bear in mind. 

166 

167 A shrink pass *makes progress* if running it changes self.shrink_target 

168 (i.e. it tries a shortlex smaller ConjectureData object satisfying 

169 the predicate). The desired end state of shrinking is to find a 

170 value such that no shrink pass can make progress, i.e. that we 

171 are at a local minimum for each shrink pass. 

172 

173 In aid of this goal, the main invariant that a shrink pass much 

174 satisfy is that whether it makes progress must be deterministic. 

175 It is fine (encouraged even) for the specific progress it makes 

176 to be non-deterministic, but if you run a shrink pass, it makes 

177 no progress, and then you immediately run it again, it should 

178 never succeed on the second time. This allows us to stop as soon 

179 as we have run each shrink pass and seen no progress on any of 

180 them. 

181 

182 This means that e.g. it's fine to try each of N deletions 

183 or replacements in a random order, but it's not OK to try N random 

184 deletions (unless you have already shrunk at least once, though we 

185 don't currently take advantage of this loophole). 

186 

187 Shrink passes need to be written so as to be robust against 

188 change in the underlying shrink target. It is generally safe 

189 to assume that the shrink target does not change prior to the 

190 point of first modification - e.g. if you change no bytes at 

191 index ``i``, all spans whose start is ``<= i`` still exist, 

192 as do all blocks, and the data object is still of length 

193 ``>= i + 1``. This can only be violated by bad user code which 

194 relies on an external source of non-determinism. 

195 

196 When the underlying shrink_target changes, shrink 

197 passes should not run substantially more test_function calls 

198 on success than they do on failure. Say, no more than a constant 

199 factor more. In particular shrink passes should not iterate to a 

200 fixed point. 

201 

202 This means that shrink passes are often written with loops that 

203 are carefully designed to do the right thing in the case that no 

204 shrinks occurred and try to adapt to any changes to do a reasonable 

205 job. e.g. say we wanted to write a shrink pass that tried deleting 

206 each individual choice (this isn't an especially good pass, 

207 but it leads to a simple illustrative example), we might do it 

208 by iterating over the choice sequence like so: 

209 

210 .. code-block:: python 

211 

212 i = 0 

213 while i < len(self.shrink_target.nodes): 

214 if not self.consider_new_nodes( 

215 self.shrink_target.nodes[:i] + self.shrink_target.nodes[i + 1 :] 

216 ): 

217 i += 1 

218 

219 The reason for writing the loop this way is that i is always a 

220 valid index into the current choice sequence, even if the current sequence 

221 changes as a result of our actions. When the choice sequence changes, 

222 we leave the index where it is rather than restarting from the 

223 beginning, and carry on. This means that the number of steps we 

224 run in this case is always bounded above by the number of steps 

225 we would run if nothing works. 

226 

227 Another thing to bear in mind about shrink pass design is that 

228 they should prioritise *progress*. If you have N operations that 

229 you need to run, you should try to order them in such a way as 

230 to avoid stalling, where you have long periods of test function 

231 invocations where no shrinks happen. This is bad because whenever 

232 we shrink we reduce the amount of work the shrinker has to do 

233 in future, and often speed up the test function, so we ideally 

234 wanted those shrinks to happen much earlier in the process. 

235 

236 Sometimes stalls are inevitable of course - e.g. if the pass 

237 makes no progress, then the entire thing is just one long stall, 

238 but it's helpful to design it so that stalls are less likely 

239 in typical behaviour. 

240 

241 The two easiest ways to do this are: 

242 

243 * Just run the N steps in random order. As long as a 

244 reasonably large proportion of the operations succeed, this 

245 guarantees the expected stall length is quite short. The 

246 book keeping for making sure this does the right thing when 

247 it succeeds can be quite annoying. 

248 * When you have any sort of nested loop, loop in such a way 

249 that both loop variables change each time. This prevents 

250 stalls which occur when one particular value for the outer 

251 loop is impossible to make progress on, rendering the entire 

252 inner loop into a stall. 

253 

254 However, although progress is good, too much progress can be 

255 a bad sign! If you're *only* seeing successful reductions, 

256 that's probably a sign that you are making changes that are 

257 too timid. Two useful things to offset this: 

258 

259 * It's worth writing shrink passes which are *adaptive*, in 

260 the sense that when operations seem to be working really 

261 well we try to bundle multiple of them together. This can 

262 often be used to turn what would be O(m) successful calls 

263 into O(log(m)). 

264 * It's often worth trying one or two special minimal values 

265 before trying anything more fine grained (e.g. replacing 

266 the whole thing with zero). 

267 

268 """ 

269 

270 def derived_value(fn): 

271 """It's useful during shrinking to have access to derived values of 

272 the current shrink target. 

273 

274 This decorator allows you to define these as cached properties. They 

275 are calculated once, then cached until the shrink target changes, then 

276 recalculated the next time they are used.""" 

277 

278 def accept(self): 

279 try: 

280 return self.__derived_values[fn.__name__] 

281 except KeyError: 

282 return self.__derived_values.setdefault(fn.__name__, fn(self)) 

283 

284 accept.__name__ = fn.__name__ 

285 return property(accept) 

286 

287 def __init__( 

288 self, 

289 engine: "ConjectureRunner", 

290 initial: Union[ConjectureData, ConjectureResult], 

291 predicate: Optional[ShrinkPredicateT], 

292 *, 

293 allow_transition: Optional[ 

294 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

295 ], 

296 explain: bool, 

297 in_target_phase: bool = False, 

298 ): 

299 """Create a shrinker for a particular engine, with a given starting 

300 point and predicate. When shrink() is called it will attempt to find an 

301 example for which predicate is True and which is strictly smaller than 

302 initial. 

303 

304 Note that initial is a ConjectureData object, and predicate 

305 takes ConjectureData objects. 

306 """ 

307 assert predicate is not None or allow_transition is not None 

308 self.engine = engine 

309 self.__predicate = predicate or (lambda data: True) 

310 self.__allow_transition = allow_transition or (lambda source, destination: True) 

311 self.__derived_values: dict = {} 

312 

313 self.initial_size = len(initial.choices) 

314 # We keep track of the current best example on the shrink_target 

315 # attribute. 

316 self.shrink_target = initial 

317 self.clear_change_tracking() 

318 self.shrinks = 0 

319 

320 # We terminate shrinks that seem to have reached their logical 

321 # conclusion: If we've called the underlying test function at 

322 # least self.max_stall times since the last time we shrunk, 

323 # it's time to stop shrinking. 

324 self.max_stall = 200 

325 self.initial_calls = self.engine.call_count 

326 self.initial_misaligned = self.engine.misaligned_count 

327 self.calls_at_last_shrink = self.initial_calls 

328 

329 self.passes_by_name: dict[str, ShrinkPass] = {} 

330 

331 # Because the shrinker is also used to `pareto_optimise` in the target phase, 

332 # we sometimes want to allow extending buffers instead of aborting at the end. 

333 self.__extend: Union[Literal["full"], int] = "full" if in_target_phase else 0 

334 self.should_explain = explain 

335 

336 @derived_value # type: ignore 

337 def cached_calculations(self): 

338 return {} 

339 

340 def cached(self, *keys): 

341 def accept(f): 

342 cache_key = (f.__name__, *keys) 

343 try: 

344 return self.cached_calculations[cache_key] 

345 except KeyError: 

346 return self.cached_calculations.setdefault(cache_key, f()) 

347 

348 return accept 

349 

350 def add_new_pass(self, run): 

351 """Creates a shrink pass corresponding to calling ``run(self)``""" 

352 

353 definition = SHRINK_PASS_DEFINITIONS[run] 

354 

355 p = ShrinkPass( 

356 run_with_chooser=definition.run_with_chooser, 

357 shrinker=self, 

358 index=len(self.passes_by_name), 

359 ) 

360 self.passes_by_name[p.name] = p 

361 return p 

362 

363 def shrink_pass(self, name): 

364 """Return the ShrinkPass object for the pass with the given name.""" 

365 if isinstance(name, ShrinkPass): 

366 return name 

367 if name not in self.passes_by_name: 

368 self.add_new_pass(name) 

369 return self.passes_by_name[name] 

370 

371 @property 

372 def calls(self) -> int: 

373 """Return the number of calls that have been made to the underlying 

374 test function.""" 

375 return self.engine.call_count 

376 

377 @property 

378 def misaligned(self) -> int: 

379 return self.engine.misaligned_count 

380 

381 def check_calls(self) -> None: 

382 if self.calls - self.calls_at_last_shrink >= self.max_stall: 

383 raise StopShrinking 

384 

385 def cached_test_function( 

386 self, nodes: Sequence[ChoiceNode] 

387 ) -> tuple[bool, Optional[Union[ConjectureResult, _Overrun]]]: 

388 nodes = nodes[: len(self.nodes)] 

389 

390 if startswith(nodes, self.nodes): 

391 return (True, None) 

392 

393 if sort_key(self.nodes) < sort_key(nodes): 

394 return (False, None) 

395 

396 # sometimes our shrinking passes try obviously invalid things. We handle 

397 # discarding them in one place here. 

398 if any(not choice_permitted(node.value, node.constraints) for node in nodes): 

399 return (False, None) 

400 

401 result = self.engine.cached_test_function( 

402 [n.value for n in nodes], extend=self.__extend 

403 ) 

404 previous = self.shrink_target 

405 self.incorporate_test_data(result) 

406 self.check_calls() 

407 return (previous is not self.shrink_target, result) 

408 

409 def consider_new_nodes(self, nodes: Sequence[ChoiceNode]) -> bool: 

410 return self.cached_test_function(nodes)[0] 

411 

412 def incorporate_test_data(self, data): 

413 """Takes a ConjectureData or Overrun object updates the current 

414 shrink_target if this data represents an improvement over it.""" 

415 if data.status < Status.VALID or data is self.shrink_target: 

416 return 

417 if ( 

418 self.__predicate(data) 

419 and sort_key(data.nodes) < sort_key(self.shrink_target.nodes) 

420 and self.__allow_transition(self.shrink_target, data) 

421 ): 

422 self.update_shrink_target(data) 

423 

424 def debug(self, msg: str) -> None: 

425 self.engine.debug(msg) 

426 

427 @property 

428 def random(self) -> "Random": 

429 return self.engine.random 

430 

431 def shrink(self) -> None: 

432 """Run the full set of shrinks and update shrink_target. 

433 

434 This method is "mostly idempotent" - calling it twice is unlikely to 

435 have any effect, though it has a non-zero probability of doing so. 

436 """ 

437 

438 try: 

439 self.initial_coarse_reduction() 

440 self.greedy_shrink() 

441 except StopShrinking: 

442 # If we stopped shrinking because we're making slow progress (instead of 

443 # reaching a local optimum), don't run the explain-phase logic. 

444 self.should_explain = False 

445 finally: 

446 if self.engine.report_debug_info: 

447 

448 def s(n): 

449 return "s" if n != 1 else "" 

450 

451 total_deleted = self.initial_size - len(self.shrink_target.choices) 

452 calls = self.engine.call_count - self.initial_calls 

453 misaligned = self.engine.misaligned_count - self.initial_misaligned 

454 

455 self.debug( 

456 "---------------------\n" 

457 "Shrink pass profiling\n" 

458 "---------------------\n\n" 

459 f"Shrinking made a total of {calls} call{s(calls)} of which " 

460 f"{self.shrinks} shrank and {misaligned} were misaligned. This " 

461 f"deleted {total_deleted} choices out of {self.initial_size}." 

462 ) 

463 for useful in [True, False]: 

464 self.debug("") 

465 if useful: 

466 self.debug("Useful passes:") 

467 else: 

468 self.debug("Useless passes:") 

469 self.debug("") 

470 for p in sorted( 

471 self.passes_by_name.values(), 

472 key=lambda t: (-t.calls, t.deletions, t.shrinks), 

473 ): 

474 if p.calls == 0: 

475 continue 

476 if (p.shrinks != 0) != useful: 

477 continue 

478 

479 self.debug( 

480 f" * {p.name} made {p.calls} call{s(p.calls)} of which " 

481 f"{p.shrinks} shrank and {p.misaligned} were misaligned, " 

482 f"deleting {p.deletions} choice{s(p.deletions)}." 

483 ) 

484 self.debug("") 

485 self.explain() 

486 

487 def explain(self) -> None: 

488 

489 if not self.should_explain or not self.shrink_target.arg_slices: 

490 return 

491 

492 self.max_stall = 2**100 

493 shrink_target = self.shrink_target 

494 nodes = self.nodes 

495 choices = self.choices 

496 chunks: dict[tuple[int, int], list[tuple[ChoiceT, ...]]] = defaultdict(list) 

497 

498 # Before we start running experiments, let's check for known inputs which would 

499 # make them redundant. The shrinking process means that we've already tried many 

500 # variations on the minimal example, so this can save a lot of time. 

501 seen_passing_seq = self.engine.passing_choice_sequences( 

502 prefix=self.nodes[: min(self.shrink_target.arg_slices)[0]] 

503 ) 

504 

505 # Now that we've shrunk to a minimal failing example, it's time to try 

506 # varying each part that we've noted will go in the final report. Consider 

507 # slices in largest-first order 

508 for start, end in sorted( 

509 self.shrink_target.arg_slices, key=lambda x: (-(x[1] - x[0]), x) 

510 ): 

511 # Check for any previous examples that match the prefix and suffix, 

512 # so we can skip if we found a passing example while shrinking. 

513 if any( 

514 startswith(seen, nodes[:start]) and endswith(seen, nodes[end:]) 

515 for seen in seen_passing_seq 

516 ): 

517 continue 

518 

519 # Run our experiments 

520 n_same_failures = 0 

521 note = "or any other generated value" 

522 # TODO: is 100 same-failures out of 500 attempts a good heuristic? 

523 for n_attempt in range(500): # pragma: no branch 

524 # no-branch here because we don't coverage-test the abort-at-500 logic. 

525 

526 if n_attempt - 10 > n_same_failures * 5: 

527 # stop early if we're seeing mostly invalid examples 

528 break # pragma: no cover 

529 

530 # replace start:end with random values 

531 replacement = [] 

532 for i in range(start, end): 

533 node = nodes[i] 

534 if not node.was_forced: 

535 value = draw_choice( 

536 node.type, node.constraints, random=self.random 

537 ) 

538 node = node.copy(with_value=value) 

539 replacement.append(node.value) 

540 

541 attempt = choices[:start] + tuple(replacement) + choices[end:] 

542 result = self.engine.cached_test_function(attempt, extend="full") 

543 

544 if result.status is Status.OVERRUN: 

545 continue # pragma: no cover # flakily covered 

546 result = cast(ConjectureResult, result) 

547 if not ( 

548 len(attempt) == len(result.choices) 

549 and endswith(result.nodes, nodes[end:]) 

550 ): 

551 # Turns out this was a variable-length part, so grab the infix... 

552 for span1, span2 in zip(shrink_target.spans, result.spans): 

553 assert span1.start == span2.start 

554 assert span1.start <= start 

555 assert span1.label == span2.label 

556 if span1.start == start and span1.end == end: 

557 result_end = span2.end 

558 break 

559 else: 

560 raise NotImplementedError("Expected matching prefixes") 

561 

562 attempt = ( 

563 choices[:start] 

564 + result.choices[start:result_end] 

565 + choices[end:] 

566 ) 

567 chunks[(start, end)].append(result.choices[start:result_end]) 

568 result = self.engine.cached_test_function(attempt) 

569 

570 if result.status is Status.OVERRUN: 

571 continue # pragma: no cover # flakily covered 

572 result = cast(ConjectureResult, result) 

573 else: 

574 chunks[(start, end)].append(result.choices[start:end]) 

575 

576 if shrink_target is not self.shrink_target: # pragma: no cover 

577 # If we've shrunk further without meaning to, bail out. 

578 self.shrink_target.slice_comments.clear() 

579 return 

580 if result.status is Status.VALID: 

581 # The test passed, indicating that this param can't vary freely. 

582 # However, it's really hard to write a simple and reliable covering 

583 # test, because of our `seen_passing_buffers` check above. 

584 break # pragma: no cover 

585 elif self.__predicate(result): # pragma: no branch 

586 n_same_failures += 1 

587 if n_same_failures >= 100: 

588 self.shrink_target.slice_comments[(start, end)] = note 

589 break 

590 

591 # Finally, if we've found multiple independently-variable parts, check whether 

592 # they can all be varied together. 

593 if len(self.shrink_target.slice_comments) <= 1: 

594 return 

595 n_same_failures_together = 0 

596 chunks_by_start_index = sorted(chunks.items()) 

597 for _ in range(500): # pragma: no branch 

598 # no-branch here because we don't coverage-test the abort-at-500 logic. 

599 new_choices: list[ChoiceT] = [] 

600 prev_end = 0 

601 for (start, end), ls in chunks_by_start_index: 

602 assert prev_end <= start < end, "these chunks must be nonoverlapping" 

603 new_choices.extend(choices[prev_end:start]) 

604 new_choices.extend(self.random.choice(ls)) 

605 prev_end = end 

606 

607 result = self.engine.cached_test_function(new_choices) 

608 

609 # This *can't* be a shrink because none of the components were. 

610 assert shrink_target is self.shrink_target 

611 if result.status == Status.VALID: 

612 self.shrink_target.slice_comments[(0, 0)] = ( 

613 "The test sometimes passed when commented parts were varied together." 

614 ) 

615 break # Test passed, this param can't vary freely. 

616 elif self.__predicate(result): # pragma: no branch 

617 n_same_failures_together += 1 

618 if n_same_failures_together >= 100: 

619 self.shrink_target.slice_comments[(0, 0)] = ( 

620 "The test always failed when commented parts were varied together." 

621 ) 

622 break 

623 

624 def greedy_shrink(self) -> None: 

625 """Run a full set of greedy shrinks (that is, ones that will only ever 

626 move to a better target) and update shrink_target appropriately. 

627 

628 This method iterates to a fixed point and so is idempontent - calling 

629 it twice will have exactly the same effect as calling it once. 

630 """ 

631 self.fixate_shrink_passes( 

632 [ 

633 "try_trivial_spans", 

634 node_program("X" * 5), 

635 node_program("X" * 4), 

636 node_program("X" * 3), 

637 node_program("X" * 2), 

638 node_program("X" * 1), 

639 "pass_to_descendant", 

640 "reorder_spans", 

641 "minimize_duplicated_choices", 

642 "minimize_individual_choices", 

643 "redistribute_numeric_pairs", 

644 "lower_integers_together", 

645 "lower_duplicated_characters", 

646 ] 

647 ) 

648 

649 def initial_coarse_reduction(self): 

650 """Performs some preliminary reductions that should not be 

651 repeated as part of the main shrink passes. 

652 

653 The main reason why these can't be included as part of shrink 

654 passes is that they have much more ability to make the test 

655 case "worse". e.g. they might rerandomise part of it, significantly 

656 increasing the value of individual nodes, which works in direct 

657 opposition to the lexical shrinking and will frequently undo 

658 its work. 

659 """ 

660 self.reduce_each_alternative() 

661 

662 @derived_value # type: ignore 

663 def spans_starting_at(self): 

664 result = [[] for _ in self.shrink_target.nodes] 

665 for i, ex in enumerate(self.spans): 

666 # We can have zero-length spans that start at the end 

667 if ex.start < len(result): 

668 result[ex.start].append(i) 

669 return tuple(map(tuple, result)) 

670 

671 def reduce_each_alternative(self): 

672 """This is a pass that is designed to rerandomise use of the 

673 one_of strategy or things that look like it, in order to try 

674 to move from later strategies to earlier ones in the branch 

675 order. 

676 

677 It does this by trying to systematically lower each value it 

678 finds that looks like it might be the branch decision for 

679 one_of, and then attempts to repair any changes in shape that 

680 this causes. 

681 """ 

682 i = 0 

683 while i < len(self.shrink_target.nodes): 

684 nodes = self.shrink_target.nodes 

685 node = nodes[i] 

686 if ( 

687 node.type == "integer" 

688 and not node.was_forced 

689 and node.value <= 10 

690 and node.constraints["min_value"] == 0 

691 ): 

692 assert isinstance(node.value, int) 

693 

694 # We've found a plausible candidate for a ``one_of`` choice. 

695 # We now want to see if the shape of the test case actually depends 

696 # on it. If it doesn't, then we don't need to do this (comparatively 

697 # costly) pass, and can let much simpler lexicographic reduction 

698 # handle it later. 

699 # 

700 # We test this by trying to set the value to zero and seeing if the 

701 # shape changes, as measured by either changing the number of subsequent 

702 # nodes, or changing the nodes in such a way as to cause one of the 

703 # previous values to no longer be valid in its position. 

704 zero_attempt = self.cached_test_function( 

705 nodes[:i] + (nodes[i].copy(with_value=0),) + nodes[i + 1 :] 

706 )[1] 

707 if ( 

708 zero_attempt is not self.shrink_target 

709 and zero_attempt is not None 

710 and zero_attempt.status >= Status.VALID 

711 ): 

712 changed_shape = len(zero_attempt.nodes) != len(nodes) 

713 

714 if not changed_shape: 

715 for j in range(i + 1, len(nodes)): 

716 zero_node = zero_attempt.nodes[j] 

717 orig_node = nodes[j] 

718 if ( 

719 zero_node.type != orig_node.type 

720 or not choice_permitted( 

721 orig_node.value, zero_node.constraints 

722 ) 

723 ): 

724 changed_shape = True 

725 break 

726 if changed_shape: 

727 for v in range(node.value): 

728 if self.try_lower_node_as_alternative(i, v): 

729 break 

730 i += 1 

731 

732 def try_lower_node_as_alternative(self, i, v): 

733 """Attempt to lower `self.shrink_target.nodes[i]` to `v`, 

734 while rerandomising and attempting to repair any subsequent 

735 changes to the shape of the test case that this causes.""" 

736 nodes = self.shrink_target.nodes 

737 if self.consider_new_nodes( 

738 nodes[:i] + (nodes[i].copy(with_value=v),) + nodes[i + 1 :] 

739 ): 

740 return True 

741 

742 prefix = nodes[:i] + (nodes[i].copy(with_value=v),) 

743 initial = self.shrink_target 

744 spans = self.spans_starting_at[i] 

745 for _ in range(3): 

746 random_attempt = self.engine.cached_test_function( 

747 [n.value for n in prefix], extend=len(nodes) 

748 ) 

749 if random_attempt.status < Status.VALID: 

750 continue 

751 self.incorporate_test_data(random_attempt) 

752 for j in spans: 

753 initial_ex = initial.spans[j] 

754 attempt_ex = random_attempt.spans[j] 

755 contents = random_attempt.nodes[attempt_ex.start : attempt_ex.end] 

756 self.consider_new_nodes(nodes[:i] + contents + nodes[initial_ex.end :]) 

757 if initial is not self.shrink_target: 

758 return True 

759 return False 

760 

761 @derived_value # type: ignore 

762 def shrink_pass_choice_trees(self): 

763 return defaultdict(ChoiceTree) 

764 

765 def fixate_shrink_passes(self, passes): 

766 """Run steps from each pass in ``passes`` until the current shrink target 

767 is a fixed point of all of them.""" 

768 passes = list(map(self.shrink_pass, passes)) 

769 

770 any_ran = True 

771 while any_ran: 

772 any_ran = False 

773 

774 reordering = {} 

775 

776 # We run remove_discarded after every pass to do cleanup 

777 # keeping track of whether that actually works. Either there is 

778 # no discarded data and it is basically free, or it reliably works 

779 # and deletes data, or it doesn't work. In that latter case we turn 

780 # it off for the rest of this loop through the passes, but will 

781 # try again once all of the passes have been run. 

782 can_discard = self.remove_discarded() 

783 

784 calls_at_loop_start = self.calls 

785 

786 # We keep track of how many calls can be made by a single step 

787 # without making progress and use this to test how much to pad 

788 # out self.max_stall by as we go along. 

789 max_calls_per_failing_step = 1 

790 

791 for sp in passes: 

792 if can_discard: 

793 can_discard = self.remove_discarded() 

794 

795 before_sp = self.shrink_target 

796 

797 # Run the shrink pass until it fails to make any progress 

798 # max_failures times in a row. This implicitly boosts shrink 

799 # passes that are more likely to work. 

800 failures = 0 

801 max_failures = 20 

802 while failures < max_failures: 

803 # We don't allow more than max_stall consecutive failures 

804 # to shrink, but this means that if we're unlucky and the 

805 # shrink passes are in a bad order where only the ones at 

806 # the end are useful, if we're not careful this heuristic 

807 # might stop us before we've tried everything. In order to 

808 # avoid that happening, we make sure that there's always 

809 # plenty of breathing room to make it through a single 

810 # iteration of the fixate_shrink_passes loop. 

811 self.max_stall = max( 

812 self.max_stall, 

813 2 * max_calls_per_failing_step 

814 + (self.calls - calls_at_loop_start), 

815 ) 

816 

817 prev = self.shrink_target 

818 initial_calls = self.calls 

819 # It's better for us to run shrink passes in a deterministic 

820 # order, to avoid repeat work, but this can cause us to create 

821 # long stalls when there are a lot of steps which fail to do 

822 # anything useful. In order to avoid this, once we've noticed 

823 # we're in a stall (i.e. half of max_failures calls have failed 

824 # to do anything) we switch to randomly jumping around. If we 

825 # find a success then we'll resume deterministic order from 

826 # there which, with any luck, is in a new good region. 

827 if not sp.step(random_order=failures >= max_failures // 2): 

828 # step returns False when there is nothing to do because 

829 # the entire choice tree is exhausted. If this happens 

830 # we break because we literally can't run this pass any 

831 # more than we already have until something else makes 

832 # progress. 

833 break 

834 any_ran = True 

835 

836 # Don't count steps that didn't actually try to do 

837 # anything as failures. Otherwise, this call is a failure 

838 # if it failed to make any changes to the shrink target. 

839 if initial_calls != self.calls: 

840 if prev is not self.shrink_target: 

841 failures = 0 

842 else: 

843 max_calls_per_failing_step = max( 

844 max_calls_per_failing_step, self.calls - initial_calls 

845 ) 

846 failures += 1 

847 

848 # We reorder the shrink passes so that on our next run through 

849 # we try good ones first. The rule is that shrink passes that 

850 # did nothing useful are the worst, shrink passes that reduced 

851 # the length are the best. 

852 if self.shrink_target is before_sp: 

853 reordering[sp] = 1 

854 elif len(self.choices) < len(before_sp.choices): 

855 reordering[sp] = -1 

856 else: 

857 reordering[sp] = 0 

858 

859 passes.sort(key=reordering.__getitem__) 

860 

861 @property 

862 def nodes(self) -> tuple[ChoiceNode, ...]: 

863 return self.shrink_target.nodes 

864 

865 @property 

866 def choices(self) -> tuple[ChoiceT, ...]: 

867 return self.shrink_target.choices 

868 

869 @property 

870 def spans(self) -> Spans: 

871 return self.shrink_target.spans 

872 

873 @derived_value # type: ignore 

874 def spans_by_label(self): 

875 """ 

876 A mapping of labels to a list of spans with that label. Spans in the list 

877 are ordered by their normal index order. 

878 """ 

879 

880 spans_by_label = defaultdict(list) 

881 for ex in self.spans: 

882 spans_by_label[ex.label].append(ex) 

883 return dict(spans_by_label) 

884 

885 @derived_value # type: ignore 

886 def distinct_labels(self): 

887 return sorted(self.spans_by_label, key=str) 

888 

889 @defines_shrink_pass() 

890 def pass_to_descendant(self, chooser): 

891 """Attempt to replace each span with a descendant span. 

892 

893 This is designed to deal with strategies that call themselves 

894 recursively. For example, suppose we had: 

895 

896 binary_tree = st.deferred( 

897 lambda: st.one_of( 

898 st.integers(), st.tuples(binary_tree, binary_tree))) 

899 

900 This pass guarantees that we can replace any binary tree with one of 

901 its subtrees - each of those will create an interval that the parent 

902 could validly be replaced with, and this pass will try doing that. 

903 

904 This is pretty expensive - it takes O(len(intervals)^2) - so we run it 

905 late in the process when we've got the number of intervals as far down 

906 as possible. 

907 """ 

908 

909 label = chooser.choose( 

910 self.distinct_labels, lambda l: len(self.spans_by_label[l]) >= 2 

911 ) 

912 

913 ls = self.spans_by_label[label] 

914 i = chooser.choose(range(len(ls) - 1)) 

915 ancestor = ls[i] 

916 

917 if i + 1 == len(ls) or ls[i + 1].start >= ancestor.end: 

918 return 

919 

920 @self.cached(label, i) 

921 def descendants(): 

922 lo = i + 1 

923 hi = len(ls) 

924 while lo + 1 < hi: 

925 mid = (lo + hi) // 2 

926 if ls[mid].start >= ancestor.end: 

927 hi = mid 

928 else: 

929 lo = mid 

930 return [t for t in ls[i + 1 : hi] if t.choice_count < ancestor.choice_count] 

931 

932 descendant = chooser.choose(descendants, lambda ex: ex.choice_count > 0) 

933 

934 assert ancestor.start <= descendant.start 

935 assert ancestor.end >= descendant.end 

936 assert descendant.choice_count < ancestor.choice_count 

937 

938 self.consider_new_nodes( 

939 self.nodes[: ancestor.start] 

940 + self.nodes[descendant.start : descendant.end] 

941 + self.nodes[ancestor.end :] 

942 ) 

943 

944 def lower_common_node_offset(self): 

945 """Sometimes we find ourselves in a situation where changes to one part 

946 of the choice sequence unlock changes to other parts. Sometimes this is 

947 good, but sometimes this can cause us to exhibit exponential slow 

948 downs! 

949 

950 e.g. suppose we had the following: 

951 

952 m = draw(integers(min_value=0)) 

953 n = draw(integers(min_value=0)) 

954 assert abs(m - n) > 1 

955 

956 If this fails then we'll end up with a loop where on each iteration we 

957 reduce each of m and n by 2 - m can't go lower because of n, then n 

958 can't go lower because of m. 

959 

960 This will take us O(m) iterations to complete, which is exponential in 

961 the data size, as we gradually zig zag our way towards zero. 

962 

963 This can only happen if we're failing to reduce the size of the choice 

964 sequence: The number of iterations that reduce the length of the choice 

965 sequence is bounded by that length. 

966 

967 So what we do is this: We keep track of which blocks are changing, and 

968 then if there's some non-zero common offset to them we try and minimize 

969 them all at once by lowering that offset. 

970 

971 This may not work, and it definitely won't get us out of all possible 

972 exponential slow downs (an example of where it doesn't is where the 

973 shape of the blocks changes as a result of this bouncing behaviour), 

974 but it fails fast when it doesn't work and gets us out of a really 

975 nastily slow case when it does. 

976 """ 

977 if len(self.__changed_nodes) <= 1: 

978 return 

979 

980 changed = [] 

981 for i in sorted(self.__changed_nodes): 

982 node = self.nodes[i] 

983 if node.trivial or node.type != "integer": 

984 continue 

985 changed.append(node) 

986 

987 if not changed: 

988 return 

989 

990 ints = [ 

991 abs(node.value - node.constraints["shrink_towards"]) for node in changed 

992 ] 

993 offset = min(ints) 

994 assert offset > 0 

995 

996 for i in range(len(ints)): 

997 ints[i] -= offset 

998 

999 st = self.shrink_target 

1000 

1001 def offset_node(node, n): 

1002 return ( 

1003 node.index, 

1004 node.index + 1, 

1005 [node.copy(with_value=node.constraints["shrink_towards"] + n)], 

1006 ) 

1007 

1008 def consider(n, sign): 

1009 return self.consider_new_nodes( 

1010 replace_all( 

1011 st.nodes, 

1012 [ 

1013 offset_node(node, sign * (n + v)) 

1014 for node, v in zip(changed, ints) 

1015 ], 

1016 ) 

1017 ) 

1018 

1019 # shrink from both sides 

1020 Integer.shrink(offset, lambda n: consider(n, 1)) 

1021 Integer.shrink(offset, lambda n: consider(n, -1)) 

1022 self.clear_change_tracking() 

1023 

1024 def clear_change_tracking(self): 

1025 self.__last_checked_changed_at = self.shrink_target 

1026 self.__all_changed_nodes = set() 

1027 

1028 def mark_changed(self, i): 

1029 self.__changed_nodes.add(i) 

1030 

1031 @property 

1032 def __changed_nodes(self) -> set[int]: 

1033 if self.__last_checked_changed_at is self.shrink_target: 

1034 return self.__all_changed_nodes 

1035 

1036 prev_target = self.__last_checked_changed_at 

1037 new_target = self.shrink_target 

1038 assert prev_target is not new_target 

1039 prev_nodes = prev_target.nodes 

1040 new_nodes = new_target.nodes 

1041 assert sort_key(new_target.nodes) < sort_key(prev_target.nodes) 

1042 

1043 if len(prev_nodes) != len(new_nodes) or any( 

1044 n1.type != n2.type for n1, n2 in zip(prev_nodes, new_nodes) 

1045 ): 

1046 # should we check constraints are equal as well? 

1047 self.__all_changed_nodes = set() 

1048 else: 

1049 assert len(prev_nodes) == len(new_nodes) 

1050 for i, (n1, n2) in enumerate(zip(prev_nodes, new_nodes)): 

1051 assert n1.type == n2.type 

1052 if not choice_equal(n1.value, n2.value): 

1053 self.__all_changed_nodes.add(i) 

1054 

1055 return self.__all_changed_nodes 

1056 

1057 def update_shrink_target(self, new_target): 

1058 assert isinstance(new_target, ConjectureResult) 

1059 self.shrinks += 1 

1060 # If we are just taking a long time to shrink we don't want to 

1061 # trigger this heuristic, so whenever we shrink successfully 

1062 # we give ourselves a bit of breathing room to make sure we 

1063 # would find a shrink that took that long to find the next time. 

1064 # The case where we're taking a long time but making steady 

1065 # progress is handled by `finish_shrinking_deadline` in engine.py 

1066 self.max_stall = max( 

1067 self.max_stall, (self.calls - self.calls_at_last_shrink) * 2 

1068 ) 

1069 self.calls_at_last_shrink = self.calls 

1070 self.shrink_target = new_target 

1071 self.__derived_values = {} 

1072 

1073 def try_shrinking_nodes(self, nodes, n): 

1074 """Attempts to replace each node in the nodes list with n. Returns 

1075 True if it succeeded (which may include some additional modifications 

1076 to shrink_target). 

1077 

1078 In current usage it is expected that each of the nodes currently have 

1079 the same value and choice_type, although this is not essential. Note that 

1080 n must be < the node at min(nodes) or this is not a valid shrink. 

1081 

1082 This method will attempt to do some small amount of work to delete data 

1083 that occurs after the end of the nodes. This is useful for cases where 

1084 there is some size dependency on the value of a node. 

1085 """ 

1086 # If the length of the shrink target has changed from under us such that 

1087 # the indices are out of bounds, give up on the replacement. 

1088 # TODO_BETTER_SHRINK: we probably want to narrow down the root cause here at some point. 

1089 if any(node.index >= len(self.nodes) for node in nodes): 

1090 return # pragma: no cover 

1091 

1092 initial_attempt = replace_all( 

1093 self.nodes, 

1094 [(node.index, node.index + 1, [node.copy(with_value=n)]) for node in nodes], 

1095 ) 

1096 

1097 attempt = self.cached_test_function(initial_attempt)[1] 

1098 

1099 if attempt is None: 

1100 return False 

1101 

1102 if attempt is self.shrink_target: 

1103 # if the initial shrink was a success, try lowering offsets. 

1104 self.lower_common_node_offset() 

1105 return True 

1106 

1107 # If this produced something completely invalid we ditch it 

1108 # here rather than trying to persevere. 

1109 if attempt.status is Status.OVERRUN: 

1110 return False 

1111 

1112 if attempt.status is Status.INVALID: 

1113 return False 

1114 

1115 if attempt.misaligned_at is not None: 

1116 # we're invalid due to a misalignment in the tree. We'll try to fix 

1117 # a very specific type of misalignment here: where we have a node of 

1118 # {"size": n} and tried to draw the same node, but with {"size": m < n}. 

1119 # This can occur with eg 

1120 # 

1121 # n = data.draw_integer() 

1122 # s = data.draw_string(min_size=n) 

1123 # 

1124 # where we try lowering n, resulting in the test_function drawing a lower 

1125 # min_size than our attempt had for the draw_string node. 

1126 # 

1127 # We'll now try realigning this tree by: 

1128 # * replacing the constraints in our attempt with what test_function tried 

1129 # to draw in practice 

1130 # * truncating the value of that node to match min_size 

1131 # 

1132 # This helps in the specific case of drawing a value and then drawing 

1133 # a collection of that size...and not much else. In practice this 

1134 # helps because this antipattern is fairly common. 

1135 

1136 # TODO we'll probably want to apply the same trick as in the valid 

1137 # case of this function of preserving from the right instead of 

1138 # preserving from the left. see test_can_shrink_variable_string_draws. 

1139 

1140 (index, attempt_choice_type, attempt_constraints, _attempt_forced) = ( 

1141 attempt.misaligned_at 

1142 ) 

1143 node = self.nodes[index] 

1144 if node.type != attempt_choice_type: 

1145 return False # pragma: no cover 

1146 if node.was_forced: 

1147 return False # pragma: no cover 

1148 

1149 if node.type in {"string", "bytes"}: 

1150 # if the size *increased*, we would have to guess what to pad with 

1151 # in order to try fixing up this attempt. Just give up. 

1152 if node.constraints["min_size"] <= attempt_constraints["min_size"]: 

1153 # attempts which increase min_size tend to overrun rather than 

1154 # be misaligned, making a covering case difficult. 

1155 return False # pragma: no cover 

1156 # the size decreased in our attempt. Try again, but truncate the value 

1157 # to that size by removing any elements past min_size. 

1158 return self.consider_new_nodes( 

1159 initial_attempt[: node.index] 

1160 + [ 

1161 initial_attempt[node.index].copy( 

1162 with_constraints=attempt_constraints, 

1163 with_value=initial_attempt[node.index].value[ 

1164 : attempt_constraints["min_size"] 

1165 ], 

1166 ) 

1167 ] 

1168 + initial_attempt[node.index :] 

1169 ) 

1170 

1171 lost_nodes = len(self.nodes) - len(attempt.nodes) 

1172 if lost_nodes <= 0: 

1173 return False 

1174 

1175 start = nodes[0].index 

1176 end = nodes[-1].index + 1 

1177 # We now look for contiguous regions to delete that might help fix up 

1178 # this failed shrink. We only look for contiguous regions of the right 

1179 # lengths because doing anything more than that starts to get very 

1180 # expensive. See minimize_individual_blocks for where we 

1181 # try to be more aggressive. 

1182 regions_to_delete = {(end, end + lost_nodes)} 

1183 

1184 for ex in self.spans: 

1185 if ex.start > start: 

1186 continue 

1187 if ex.end <= end: 

1188 continue 

1189 

1190 if ex.index >= len(attempt.spans): 

1191 continue # pragma: no cover 

1192 

1193 replacement = attempt.spans[ex.index] 

1194 in_original = [c for c in ex.children if c.start >= end] 

1195 in_replaced = [c for c in replacement.children if c.start >= end] 

1196 

1197 if len(in_replaced) >= len(in_original) or not in_replaced: 

1198 continue 

1199 

1200 # We've found a span where some of the children went missing 

1201 # as a result of this change, and just replacing it with the data 

1202 # it would have had and removing the spillover didn't work. This 

1203 # means that some of its children towards the right must be 

1204 # important, so we try to arrange it so that it retains its 

1205 # rightmost children instead of its leftmost. 

1206 regions_to_delete.add( 

1207 (in_original[0].start, in_original[-len(in_replaced)].start) 

1208 ) 

1209 

1210 for u, v in sorted(regions_to_delete, key=lambda x: x[1] - x[0], reverse=True): 

1211 try_with_deleted = initial_attempt[:u] + initial_attempt[v:] 

1212 if self.consider_new_nodes(try_with_deleted): 

1213 return True 

1214 

1215 return False 

1216 

1217 def remove_discarded(self): 

1218 """Try removing all bytes marked as discarded. 

1219 

1220 This is primarily to deal with data that has been ignored while 

1221 doing rejection sampling - e.g. as a result of an integer range, or a 

1222 filtered strategy. 

1223 

1224 Such data will also be handled by the adaptive_example_deletion pass, 

1225 but that pass is necessarily more conservative and will try deleting 

1226 each interval individually. The common case is that all data drawn and 

1227 rejected can just be thrown away immediately in one block, so this pass 

1228 will be much faster than trying each one individually when it works. 

1229 

1230 returns False if there is discarded data and removing it does not work, 

1231 otherwise returns True. 

1232 """ 

1233 while self.shrink_target.has_discards: 

1234 discarded = [] 

1235 

1236 for ex in self.shrink_target.spans: 

1237 if ( 

1238 ex.choice_count > 0 

1239 and ex.discarded 

1240 and (not discarded or ex.start >= discarded[-1][-1]) 

1241 ): 

1242 discarded.append((ex.start, ex.end)) 

1243 

1244 # This can happen if we have discards but they are all of 

1245 # zero length. This shouldn't happen very often so it's 

1246 # faster to check for it here than at the point of example 

1247 # generation. 

1248 if not discarded: 

1249 break 

1250 

1251 attempt = list(self.nodes) 

1252 for u, v in reversed(discarded): 

1253 del attempt[u:v] 

1254 

1255 if not self.consider_new_nodes(tuple(attempt)): 

1256 return False 

1257 return True 

1258 

1259 @derived_value # type: ignore 

1260 def duplicated_nodes(self): 

1261 """Returns a list of nodes grouped (choice_type, value).""" 

1262 duplicates = defaultdict(list) 

1263 for node in self.nodes: 

1264 duplicates[(node.type, choice_key(node.value))].append(node) 

1265 return list(duplicates.values()) 

1266 

1267 @defines_shrink_pass() 

1268 def minimize_duplicated_choices(self, chooser): 

1269 """Find choices that have been duplicated in multiple places and attempt 

1270 to minimize all of the duplicates simultaneously. 

1271 

1272 This lets us handle cases where two values can't be shrunk 

1273 independently of each other but can easily be shrunk together. 

1274 For example if we had something like: 

1275 

1276 ls = data.draw(lists(integers())) 

1277 y = data.draw(integers()) 

1278 assert y not in ls 

1279 

1280 Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were 

1281 to replace both 3s with 0, this would be a valid shrink, but if we were 

1282 to replace either 3 with 0 on its own the test would start passing. 

1283 

1284 It is also useful for when that duplication is accidental and the value 

1285 of the blocks doesn't matter very much because it allows us to replace 

1286 more values at once. 

1287 """ 

1288 nodes = chooser.choose(self.duplicated_nodes) 

1289 # we can't lower any nodes which are trivial. try proceeding with the 

1290 # remaining nodes. 

1291 nodes = [node for node in nodes if not node.trivial] 

1292 if len(nodes) <= 1: 

1293 return 

1294 

1295 self.minimize_nodes(nodes) 

1296 

1297 @defines_shrink_pass() 

1298 def redistribute_numeric_pairs(self, chooser): 

1299 """If there is a sum of generated numbers that we need their sum 

1300 to exceed some bound, lowering one of them requires raising the 

1301 other. This pass enables that.""" 

1302 

1303 # look for a pair of nodes (node1, node2) which are both numeric 

1304 # and aren't separated by too many other nodes. We'll decrease node1 and 

1305 # increase node2 (note that the other way around doesn't make sense as 

1306 # it's strictly worse in the ordering). 

1307 def can_choose_node(node): 

1308 # don't choose nan, inf, or floats above the threshold where f + 1 > f 

1309 # (which is not necessarily true for floats above MAX_PRECISE_INTEGER). 

1310 # The motivation for the last condition is to avoid trying weird 

1311 # non-shrinks where we raise one node and think we lowered another 

1312 # (but didn't). 

1313 return node.type in {"integer", "float"} and not ( 

1314 node.type == "float" 

1315 and (math.isnan(node.value) or abs(node.value) >= MAX_PRECISE_INTEGER) 

1316 ) 

1317 

1318 node1 = chooser.choose( 

1319 self.nodes, 

1320 lambda node: can_choose_node(node) and not node.trivial, 

1321 ) 

1322 node2 = chooser.choose( 

1323 self.nodes, 

1324 lambda node: can_choose_node(node) 

1325 # Note that it's fine for node2 to be trivial, because we're going to 

1326 # explicitly make it *not* trivial by adding to its value. 

1327 and not node.was_forced 

1328 # to avoid quadratic behavior, scan ahead only a small amount for 

1329 # the related node. 

1330 and node1.index < node.index <= node1.index + 4, 

1331 ) 

1332 

1333 m: Union[int, float] = node1.value 

1334 n: Union[int, float] = node2.value 

1335 

1336 def boost(k: int) -> bool: 

1337 if k > m: 

1338 return False 

1339 

1340 try: 

1341 v1 = m - k 

1342 v2 = n + k 

1343 except OverflowError: # pragma: no cover 

1344 # if n or m is a float and k is over sys.float_info.max, coercing 

1345 # k to a float will overflow. 

1346 return False 

1347 

1348 # if we've increased node2 to the point that we're past max precision, 

1349 # give up - things have become too unstable. 

1350 if node1.type == "float" and v2 >= MAX_PRECISE_INTEGER: 

1351 return False 

1352 

1353 return self.consider_new_nodes( 

1354 self.nodes[: node1.index] 

1355 + (node1.copy(with_value=v1),) 

1356 + self.nodes[node1.index + 1 : node2.index] 

1357 + (node2.copy(with_value=v2),) 

1358 + self.nodes[node2.index + 1 :] 

1359 ) 

1360 

1361 find_integer(boost) 

1362 

1363 @defines_shrink_pass() 

1364 def lower_integers_together(self, chooser): 

1365 node1 = chooser.choose( 

1366 self.nodes, lambda n: n.type == "integer" and not n.trivial 

1367 ) 

1368 # Search up to 3 nodes ahead, to avoid quadratic time. 

1369 node2 = self.nodes[ 

1370 chooser.choose( 

1371 range(node1.index + 1, min(len(self.nodes), node1.index + 3 + 1)), 

1372 lambda i: self.nodes[i].type == "integer" 

1373 and not self.nodes[i].was_forced, 

1374 ) 

1375 ] 

1376 

1377 # one might expect us to require node2 to be nontrivial, and to minimize 

1378 # the node which is closer to its shrink_towards, rather than node1 

1379 # unconditionally. In reality, it's acceptable for us to transition node2 

1380 # from trivial to nontrivial, because the shrink ordering is dominated by 

1381 # the complexity of the earlier node1. What matters is minimizing node1. 

1382 shrink_towards = node1.constraints["shrink_towards"] 

1383 

1384 def consider(n): 

1385 return self.consider_new_nodes( 

1386 self.nodes[: node1.index] 

1387 + (node1.copy(with_value=node1.value - n),) 

1388 + self.nodes[node1.index + 1 : node2.index] 

1389 + (node2.copy(with_value=node2.value - n),) 

1390 + self.nodes[node2.index + 1 :] 

1391 ) 

1392 

1393 find_integer(lambda n: consider(shrink_towards - n)) 

1394 find_integer(lambda n: consider(n - shrink_towards)) 

1395 

1396 @defines_shrink_pass() 

1397 def lower_duplicated_characters(self, chooser): 

1398 """ 

1399 Select two string choices no more than 4 choices apart and simultaneously 

1400 lower characters which appear in both strings. This helps cases where the 

1401 same character must appear in two strings, but the actual value of the 

1402 character is not relevant. 

1403 

1404 This shrinking pass currently only tries lowering *all* instances of the 

1405 duplicated character in both strings. So for instance, given two choices: 

1406 

1407 "bbac" 

1408 "abbb" 

1409 

1410 we would try lowering all five of the b characters simultaneously. This 

1411 may fail to shrink some cases where only certain character indices are 

1412 correlated, for instance if only the b at index 1 could be lowered 

1413 simultaneously and the rest did in fact actually have to be a `b`. 

1414 

1415 It would be nice to try shrinking that case as well, but we would need good 

1416 safeguards because it could get very expensive to try all combinations. 

1417 I expect lowering all duplicates to handle most cases in the meantime. 

1418 """ 

1419 node1 = chooser.choose( 

1420 self.nodes, lambda n: n.type == "string" and not n.trivial 

1421 ) 

1422 

1423 # limit search to up to 4 choices ahead, to avoid quadratic behavior 

1424 node2 = self.nodes[ 

1425 chooser.choose( 

1426 range(node1.index + 1, min(len(self.nodes), node1.index + 1 + 4)), 

1427 lambda i: self.nodes[i].type == "string" and not self.nodes[i].trivial 

1428 # select nodes which have at least one of the same character present 

1429 and set(node1.value) & set(self.nodes[i].value), 

1430 ) 

1431 ] 

1432 

1433 duplicated_characters = set(node1.value) & set(node2.value) 

1434 # deterministic ordering 

1435 char = chooser.choose(sorted(duplicated_characters)) 

1436 intervals = node1.constraints["intervals"] 

1437 

1438 def copy_node(node, n): 

1439 # replace all duplicate characters in each string. This might miss 

1440 # some shrinks compared to only replacing some, but trying all possible 

1441 # combinations of indices could get expensive if done without some 

1442 # thought. 

1443 return node.copy( 

1444 with_value=node.value.replace(char, intervals.char_in_shrink_order(n)) 

1445 ) 

1446 

1447 Integer.shrink( 

1448 intervals.index_from_char_in_shrink_order(char), 

1449 lambda n: self.consider_new_nodes( 

1450 self.nodes[: node1.index] 

1451 + (copy_node(node1, n),) 

1452 + self.nodes[node1.index + 1 : node2.index] 

1453 + (copy_node(node2, n),) 

1454 + self.nodes[node2.index + 1 :] 

1455 ), 

1456 ) 

1457 

1458 def minimize_nodes(self, nodes): 

1459 choice_type = nodes[0].type 

1460 value = nodes[0].value 

1461 # unlike choice_type and value, constraints are *not* guaranteed to be equal among all 

1462 # passed nodes. We arbitrarily use the constraints of the first node. I think 

1463 # this is unsound (= leads to us trying shrinks that could not have been 

1464 # generated), but those get discarded at test-time, and this enables useful 

1465 # slips where constraints are not equal but are close enough that doing the 

1466 # same operation on both basically just works. 

1467 constraints = nodes[0].constraints 

1468 assert all( 

1469 node.type == choice_type and choice_equal(node.value, value) 

1470 for node in nodes 

1471 ) 

1472 

1473 if choice_type == "integer": 

1474 shrink_towards = constraints["shrink_towards"] 

1475 # try shrinking from both sides towards shrink_towards. 

1476 # we're starting from n = abs(shrink_towards - value). Because the 

1477 # shrinker will not check its starting value, we need to try 

1478 # shrinking to n first. 

1479 self.try_shrinking_nodes(nodes, abs(shrink_towards - value)) 

1480 Integer.shrink( 

1481 abs(shrink_towards - value), 

1482 lambda n: self.try_shrinking_nodes(nodes, shrink_towards + n), 

1483 ) 

1484 Integer.shrink( 

1485 abs(shrink_towards - value), 

1486 lambda n: self.try_shrinking_nodes(nodes, shrink_towards - n), 

1487 ) 

1488 elif choice_type == "float": 

1489 self.try_shrinking_nodes(nodes, abs(value)) 

1490 Float.shrink( 

1491 abs(value), 

1492 lambda val: self.try_shrinking_nodes(nodes, val), 

1493 ) 

1494 Float.shrink( 

1495 abs(value), 

1496 lambda val: self.try_shrinking_nodes(nodes, -val), 

1497 ) 

1498 elif choice_type == "boolean": 

1499 # must be True, otherwise would be trivial and not selected. 

1500 assert value is True 

1501 # only one thing to try: false! 

1502 self.try_shrinking_nodes(nodes, False) 

1503 elif choice_type == "bytes": 

1504 Bytes.shrink( 

1505 value, 

1506 lambda val: self.try_shrinking_nodes(nodes, val), 

1507 min_size=constraints["min_size"], 

1508 ) 

1509 elif choice_type == "string": 

1510 String.shrink( 

1511 value, 

1512 lambda val: self.try_shrinking_nodes(nodes, val), 

1513 intervals=constraints["intervals"], 

1514 min_size=constraints["min_size"], 

1515 ) 

1516 else: 

1517 raise NotImplementedError 

1518 

1519 @defines_shrink_pass() 

1520 def try_trivial_spans(self, chooser): 

1521 i = chooser.choose(range(len(self.spans))) 

1522 

1523 prev = self.shrink_target 

1524 nodes = self.shrink_target.nodes 

1525 ex = self.spans[i] 

1526 prefix = nodes[: ex.start] 

1527 replacement = tuple( 

1528 [ 

1529 ( 

1530 node 

1531 if node.was_forced 

1532 else node.copy( 

1533 with_value=choice_from_index(0, node.type, node.constraints) 

1534 ) 

1535 ) 

1536 for node in nodes[ex.start : ex.end] 

1537 ] 

1538 ) 

1539 suffix = nodes[ex.end :] 

1540 attempt = self.cached_test_function(prefix + replacement + suffix)[1] 

1541 

1542 if self.shrink_target is not prev: 

1543 return 

1544 

1545 if isinstance(attempt, ConjectureResult): 

1546 new_ex = attempt.spans[i] 

1547 new_replacement = attempt.nodes[new_ex.start : new_ex.end] 

1548 self.consider_new_nodes(prefix + new_replacement + suffix) 

1549 

1550 @defines_shrink_pass() 

1551 def minimize_individual_choices(self, chooser): 

1552 """Attempt to minimize each choice in sequence. 

1553 

1554 This is the pass that ensures that e.g. each integer we draw is a 

1555 minimum value. So it's the part that guarantees that if we e.g. do 

1556 

1557 x = data.draw(integers()) 

1558 assert x < 10 

1559 

1560 then in our shrunk example, x = 10 rather than say 97. 

1561 

1562 If we are unsuccessful at minimizing a choice of interest we then 

1563 check if that's because it's changing the size of the test case and, 

1564 if so, we also make an attempt to delete parts of the test case to 

1565 see if that fixes it. 

1566 

1567 We handle most of the common cases in try_shrinking_nodes which is 

1568 pretty good at clearing out large contiguous blocks of dead space, 

1569 but it fails when there is data that has to stay in particular places 

1570 in the list. 

1571 """ 

1572 node = chooser.choose(self.nodes, lambda node: not node.trivial) 

1573 initial_target = self.shrink_target 

1574 

1575 self.minimize_nodes([node]) 

1576 if self.shrink_target is not initial_target: 

1577 # the shrink target changed, so our shrink worked. Defer doing 

1578 # anything more intelligent until this shrink fails. 

1579 return 

1580 

1581 # the shrink failed. One particularly common case where minimizing a 

1582 # node can fail is the antipattern of drawing a size and then drawing a 

1583 # collection of that size, or more generally when there is a size 

1584 # dependency on some single node. We'll explicitly try and fix up this 

1585 # common case here: if decreasing an integer node by one would reduce 

1586 # the size of the generated input, we'll try deleting things after that 

1587 # node and see if the resulting attempt works. 

1588 

1589 if node.type != "integer": 

1590 # Only try this fixup logic on integer draws. Almost all size 

1591 # dependencies are on integer draws, and if it's not, it's doing 

1592 # something convoluted enough that it is unlikely to shrink well anyway. 

1593 # TODO: extent to floats? we probably currently fail on the following, 

1594 # albeit convoluted example: 

1595 # n = int(data.draw(st.floats())) 

1596 # s = data.draw(st.lists(st.integers(), min_size=n, max_size=n)) 

1597 return 

1598 

1599 lowered = ( 

1600 self.nodes[: node.index] 

1601 + (node.copy(with_value=node.value - 1),) 

1602 + self.nodes[node.index + 1 :] 

1603 ) 

1604 attempt = self.cached_test_function(lowered)[1] 

1605 if ( 

1606 attempt is None 

1607 or attempt.status < Status.VALID 

1608 or len(attempt.nodes) == len(self.nodes) 

1609 or len(attempt.nodes) == node.index + 1 

1610 ): 

1611 # no point in trying our size-dependency-logic if our attempt at 

1612 # lowering the node resulted in: 

1613 # * an invalid conjecture data 

1614 # * the same number of nodes as before 

1615 # * no nodes beyond the lowered node (nothing to try to delete afterwards) 

1616 return 

1617 

1618 # If it were then the original shrink should have worked and we could 

1619 # never have got here. 

1620 assert attempt is not self.shrink_target 

1621 

1622 @self.cached(node.index) 

1623 def first_span_after_node(): 

1624 lo = 0 

1625 hi = len(self.spans) 

1626 while lo + 1 < hi: 

1627 mid = (lo + hi) // 2 

1628 ex = self.spans[mid] 

1629 if ex.start >= node.index: 

1630 hi = mid 

1631 else: 

1632 lo = mid 

1633 return hi 

1634 

1635 # we try deleting both entire spans, and single nodes. 

1636 # If we wanted to get more aggressive, we could try deleting n 

1637 # consecutive nodes (that don't cross a span boundary) for say 

1638 # n <= 2 or n <= 3. 

1639 if chooser.choose([True, False]): 

1640 ex = self.spans[ 

1641 chooser.choose( 

1642 range(first_span_after_node, len(self.spans)), 

1643 lambda i: self.spans[i].choice_count > 0, 

1644 ) 

1645 ] 

1646 self.consider_new_nodes(lowered[: ex.start] + lowered[ex.end :]) 

1647 else: 

1648 node = self.nodes[chooser.choose(range(node.index + 1, len(self.nodes)))] 

1649 self.consider_new_nodes(lowered[: node.index] + lowered[node.index + 1 :]) 

1650 

1651 @defines_shrink_pass() 

1652 def reorder_spans(self, chooser): 

1653 """This pass allows us to reorder the children of each span. 

1654 

1655 For example, consider the following: 

1656 

1657 .. code-block:: python 

1658 

1659 import hypothesis.strategies as st 

1660 from hypothesis import given 

1661 

1662 

1663 @given(st.text(), st.text()) 

1664 def test_not_equal(x, y): 

1665 assert x != y 

1666 

1667 Without the ability to reorder x and y this could fail either with 

1668 ``x=""``, ``y="0"``, or the other way around. With reordering it will 

1669 reliably fail with ``x=""``, ``y="0"``. 

1670 """ 

1671 ex = chooser.choose(self.spans) 

1672 label = chooser.choose(ex.children).label 

1673 

1674 spans = [c for c in ex.children if c.label == label] 

1675 if len(spans) <= 1: 

1676 return 

1677 st = self.shrink_target 

1678 endpoints = [(ex.start, ex.end) for ex in spans] 

1679 

1680 Ordering.shrink( 

1681 range(len(spans)), 

1682 lambda indices: self.consider_new_nodes( 

1683 replace_all( 

1684 st.nodes, 

1685 [ 

1686 ( 

1687 u, 

1688 v, 

1689 st.nodes[spans[i].start : spans[i].end], 

1690 ) 

1691 for (u, v), i in zip(endpoints, indices) 

1692 ], 

1693 ) 

1694 ), 

1695 key=lambda i: sort_key(st.nodes[spans[i].start : spans[i].end]), 

1696 ) 

1697 

1698 def run_node_program(self, i, description, original, repeats=1): 

1699 """Node programs are a mini-DSL for node rewriting, defined as a sequence 

1700 of commands that can be run at some index into the nodes 

1701 

1702 Commands are: 

1703 

1704 * "X", delete this node 

1705 

1706 This method runs the node program in ``description`` at node index 

1707 ``i`` on the ConjectureData ``original``. If ``repeats > 1`` then it 

1708 will attempt to approximate the results of running it that many times. 

1709 

1710 Returns True if this successfully changes the underlying shrink target, 

1711 else False. 

1712 """ 

1713 if i + len(description) > len(original.nodes) or i < 0: 

1714 return False 

1715 attempt = list(original.nodes) 

1716 for _ in range(repeats): 

1717 for k, command in reversed(list(enumerate(description))): 

1718 j = i + k 

1719 if j >= len(attempt): 

1720 return False 

1721 

1722 if command == "X": 

1723 del attempt[j] 

1724 else: 

1725 raise NotImplementedError(f"Unrecognised command {command!r}") 

1726 

1727 return self.consider_new_nodes(attempt) 

1728 

1729 

1730def shrink_pass_family(f): 

1731 def accept(*args): 

1732 name = "{}({})".format(f.__name__, ", ".join(map(repr, args))) 

1733 if name not in SHRINK_PASS_DEFINITIONS: 

1734 

1735 def run(self, chooser): 

1736 return f(self, chooser, *args) 

1737 

1738 run.__name__ = name 

1739 defines_shrink_pass()(run) 

1740 assert name in SHRINK_PASS_DEFINITIONS 

1741 return name 

1742 

1743 return accept 

1744 

1745 

1746@shrink_pass_family 

1747def node_program(self, chooser, description): 

1748 n = len(description) 

1749 # Adaptively attempt to run the node program at the current 

1750 # index. If this successfully applies the node program ``k`` times 

1751 # then this runs in ``O(log(k))`` test function calls. 

1752 i = chooser.choose(range(len(self.nodes) - n + 1)) 

1753 

1754 # First, run the node program at the chosen index. If this fails, 

1755 # don't do any extra work, so that failure is as cheap as possible. 

1756 if not self.run_node_program(i, description, original=self.shrink_target): 

1757 return 

1758 

1759 # Because we run in a random order we will often find ourselves in the middle 

1760 # of a region where we could run the node program. We thus start by moving 

1761 # left to the beginning of that region if possible in order to to start from 

1762 # the beginning of that region. 

1763 def offset_left(k): 

1764 return i - k * n 

1765 

1766 i = offset_left( 

1767 find_integer( 

1768 lambda k: self.run_node_program( 

1769 offset_left(k), description, original=self.shrink_target 

1770 ) 

1771 ) 

1772 ) 

1773 

1774 original = self.shrink_target 

1775 # Now try to run the block program multiple times here. 

1776 find_integer( 

1777 lambda k: self.run_node_program(i, description, original=original, repeats=k) 

1778 ) 

1779 

1780 

1781@attr.s(slots=True, eq=False) 

1782class ShrinkPass: 

1783 run_with_chooser = attr.ib() 

1784 index = attr.ib() 

1785 shrinker = attr.ib() 

1786 

1787 last_prefix = attr.ib(default=()) 

1788 successes = attr.ib(default=0) 

1789 calls = attr.ib(default=0) 

1790 misaligned = attr.ib(default=0) 

1791 shrinks = attr.ib(default=0) 

1792 deletions = attr.ib(default=0) 

1793 

1794 def step(self, *, random_order=False): 

1795 tree = self.shrinker.shrink_pass_choice_trees[self] 

1796 if tree.exhausted: 

1797 return False 

1798 

1799 initial_shrinks = self.shrinker.shrinks 

1800 initial_calls = self.shrinker.calls 

1801 initial_misaligned = self.shrinker.misaligned 

1802 size = len(self.shrinker.shrink_target.choices) 

1803 self.shrinker.engine.explain_next_call_as(self.name) 

1804 

1805 if random_order: 

1806 selection_order = random_selection_order(self.shrinker.random) 

1807 else: 

1808 selection_order = prefix_selection_order(self.last_prefix) 

1809 

1810 try: 

1811 self.last_prefix = tree.step( 

1812 selection_order, 

1813 lambda chooser: self.run_with_chooser(self.shrinker, chooser), 

1814 ) 

1815 finally: 

1816 self.calls += self.shrinker.calls - initial_calls 

1817 self.misaligned += self.shrinker.misaligned - initial_misaligned 

1818 self.shrinks += self.shrinker.shrinks - initial_shrinks 

1819 self.deletions += size - len(self.shrinker.shrink_target.choices) 

1820 self.shrinker.engine.clear_call_explanation() 

1821 return True 

1822 

1823 @property 

1824 def name(self) -> str: 

1825 return self.run_with_chooser.__name__ 

1826 

1827 

1828class StopShrinking(Exception): 

1829 pass