1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import math
12from collections import defaultdict
13from collections.abc import Callable, Sequence
14from dataclasses import dataclass
15from typing import (
16 TYPE_CHECKING,
17 Any,
18 Literal,
19 TypeAlias,
20 cast,
21)
22
23from hypothesis.internal.conjecture.choice import (
24 ChoiceNode,
25 ChoiceT,
26 choice_equal,
27 choice_from_index,
28 choice_key,
29 choice_permitted,
30 choice_to_index,
31)
32from hypothesis.internal.conjecture.data import (
33 ConjectureData,
34 ConjectureResult,
35 Spans,
36 Status,
37 _Overrun,
38 draw_choice,
39)
40from hypothesis.internal.conjecture.junkdrawer import (
41 endswith,
42 find_integer,
43 replace_all,
44 startswith,
45)
46from hypothesis.internal.conjecture.shrinking import (
47 Bytes,
48 Float,
49 Integer,
50 Ordering,
51 String,
52)
53from hypothesis.internal.conjecture.shrinking.choicetree import (
54 ChoiceTree,
55 prefix_selection_order,
56 random_selection_order,
57)
58from hypothesis.internal.floats import MAX_PRECISE_INTEGER
59
60if TYPE_CHECKING:
61 from random import Random
62
63 from hypothesis.internal.conjecture.engine import ConjectureRunner
64
65ShrinkPredicateT: TypeAlias = Callable[[ConjectureResult | _Overrun], bool]
66
67
68def sort_key(nodes: Sequence[ChoiceNode]) -> tuple[int, tuple[int, ...]]:
69 """Returns a sort key such that "simpler" choice sequences are smaller than
70 "more complicated" ones.
71
72 We define sort_key so that x is simpler than y if x is shorter than y or if
73 they have the same length and map(choice_to_index, x) < map(choice_to_index, y).
74
75 The reason for using this ordering is:
76
77 1. If x is shorter than y then that means we had to make fewer decisions
78 in constructing the test case when we ran x than we did when we ran y.
79 2. If x is the same length as y then replacing a choice with a lower index
80 choice corresponds to replacing it with a simpler/smaller choice.
81 3. Because choices drawn early in generation potentially get used in more
82 places they potentially have a more significant impact on the final
83 result, so it makes sense to prioritise reducing earlier choices over
84 later ones.
85 """
86 return (
87 len(nodes),
88 tuple(choice_to_index(node.value, node.constraints) for node in nodes),
89 )
90
91
92@dataclass(slots=True, frozen=False)
93class ShrinkPass:
94 function: Any
95 name: str | None = None
96 last_prefix: Any = ()
97
98 # some execution statistics
99 calls: int = 0
100 misaligned: int = 0
101 shrinks: int = 0
102 deletions: int = 0
103
104 def __post_init__(self):
105 if self.name is None:
106 self.name = self.function.__name__
107
108 def __hash__(self):
109 return hash(self.name)
110
111
112class StopShrinking(Exception):
113 pass
114
115
116class Shrinker:
117 """A shrinker is a child object of a ConjectureRunner which is designed to
118 manage the associated state of a particular shrink problem. That is, we
119 have some initial ConjectureData object and some property of interest
120 that it satisfies, and we want to find a ConjectureData object with a
121 shortlex (see sort_key above) smaller choice sequence that exhibits the same
122 property.
123
124 Currently the only property of interest we use is that the status is
125 INTERESTING and the interesting_origin takes on some fixed value, but we
126 may potentially be interested in other use cases later.
127 However we assume that data with a status < VALID never satisfies the predicate.
128
129 The shrinker keeps track of a value shrink_target which represents the
130 current best known ConjectureData object satisfying the predicate.
131 It refines this value by repeatedly running *shrink passes*, which are
132 methods that perform a series of transformations to the current shrink_target
133 and evaluate the underlying test function to find new ConjectureData
134 objects. If any of these satisfy the predicate, the shrink_target
135 is updated automatically. Shrinking runs until no shrink pass can
136 improve the shrink_target, at which point it stops. It may also be
137 terminated if the underlying engine throws RunIsComplete, but that
138 is handled by the calling code rather than the Shrinker.
139
140 =======================
141 Designing Shrink Passes
142 =======================
143
144 Generally a shrink pass is just any function that calls
145 cached_test_function and/or consider_new_nodes a number of times,
146 but there are a couple of useful things to bear in mind.
147
148 A shrink pass *makes progress* if running it changes self.shrink_target
149 (i.e. it tries a shortlex smaller ConjectureData object satisfying
150 the predicate). The desired end state of shrinking is to find a
151 value such that no shrink pass can make progress, i.e. that we
152 are at a local minimum for each shrink pass.
153
154 In aid of this goal, the main invariant that a shrink pass much
155 satisfy is that whether it makes progress must be deterministic.
156 It is fine (encouraged even) for the specific progress it makes
157 to be non-deterministic, but if you run a shrink pass, it makes
158 no progress, and then you immediately run it again, it should
159 never succeed on the second time. This allows us to stop as soon
160 as we have run each shrink pass and seen no progress on any of
161 them.
162
163 This means that e.g. it's fine to try each of N deletions
164 or replacements in a random order, but it's not OK to try N random
165 deletions (unless you have already shrunk at least once, though we
166 don't currently take advantage of this loophole).
167
168 Shrink passes need to be written so as to be robust against
169 change in the underlying shrink target. It is generally safe
170 to assume that the shrink target does not change prior to the
171 point of first modification - e.g. if you change no bytes at
172 index ``i``, all spans whose start is ``<= i`` still exist,
173 as do all blocks, and the data object is still of length
174 ``>= i + 1``. This can only be violated by bad user code which
175 relies on an external source of non-determinism.
176
177 When the underlying shrink_target changes, shrink
178 passes should not run substantially more test_function calls
179 on success than they do on failure. Say, no more than a constant
180 factor more. In particular shrink passes should not iterate to a
181 fixed point.
182
183 This means that shrink passes are often written with loops that
184 are carefully designed to do the right thing in the case that no
185 shrinks occurred and try to adapt to any changes to do a reasonable
186 job. e.g. say we wanted to write a shrink pass that tried deleting
187 each individual choice (this isn't an especially good pass,
188 but it leads to a simple illustrative example), we might do it
189 by iterating over the choice sequence like so:
190
191 .. code-block:: python
192
193 i = 0
194 while i < len(self.shrink_target.nodes):
195 if not self.consider_new_nodes(
196 self.shrink_target.nodes[:i] + self.shrink_target.nodes[i + 1 :]
197 ):
198 i += 1
199
200 The reason for writing the loop this way is that i is always a
201 valid index into the current choice sequence, even if the current sequence
202 changes as a result of our actions. When the choice sequence changes,
203 we leave the index where it is rather than restarting from the
204 beginning, and carry on. This means that the number of steps we
205 run in this case is always bounded above by the number of steps
206 we would run if nothing works.
207
208 Another thing to bear in mind about shrink pass design is that
209 they should prioritise *progress*. If you have N operations that
210 you need to run, you should try to order them in such a way as
211 to avoid stalling, where you have long periods of test function
212 invocations where no shrinks happen. This is bad because whenever
213 we shrink we reduce the amount of work the shrinker has to do
214 in future, and often speed up the test function, so we ideally
215 wanted those shrinks to happen much earlier in the process.
216
217 Sometimes stalls are inevitable of course - e.g. if the pass
218 makes no progress, then the entire thing is just one long stall,
219 but it's helpful to design it so that stalls are less likely
220 in typical behaviour.
221
222 The two easiest ways to do this are:
223
224 * Just run the N steps in random order. As long as a
225 reasonably large proportion of the operations succeed, this
226 guarantees the expected stall length is quite short. The
227 book keeping for making sure this does the right thing when
228 it succeeds can be quite annoying.
229 * When you have any sort of nested loop, loop in such a way
230 that both loop variables change each time. This prevents
231 stalls which occur when one particular value for the outer
232 loop is impossible to make progress on, rendering the entire
233 inner loop into a stall.
234
235 However, although progress is good, too much progress can be
236 a bad sign! If you're *only* seeing successful reductions,
237 that's probably a sign that you are making changes that are
238 too timid. Two useful things to offset this:
239
240 * It's worth writing shrink passes which are *adaptive*, in
241 the sense that when operations seem to be working really
242 well we try to bundle multiple of them together. This can
243 often be used to turn what would be O(m) successful calls
244 into O(log(m)).
245 * It's often worth trying one or two special minimal values
246 before trying anything more fine grained (e.g. replacing
247 the whole thing with zero).
248
249 """
250
251 def derived_value(fn):
252 """It's useful during shrinking to have access to derived values of
253 the current shrink target.
254
255 This decorator allows you to define these as cached properties. They
256 are calculated once, then cached until the shrink target changes, then
257 recalculated the next time they are used."""
258
259 def accept(self):
260 try:
261 return self.__derived_values[fn.__name__]
262 except KeyError:
263 return self.__derived_values.setdefault(fn.__name__, fn(self))
264
265 accept.__name__ = fn.__name__
266 return property(accept)
267
268 def __init__(
269 self,
270 engine: "ConjectureRunner",
271 initial: ConjectureData | ConjectureResult,
272 predicate: ShrinkPredicateT | None,
273 *,
274 allow_transition: (
275 Callable[[ConjectureData | ConjectureResult, ConjectureData], bool] | None
276 ),
277 explain: bool,
278 in_target_phase: bool = False,
279 ):
280 """Create a shrinker for a particular engine, with a given starting
281 point and predicate. When shrink() is called it will attempt to find an
282 example for which predicate is True and which is strictly smaller than
283 initial.
284
285 Note that initial is a ConjectureData object, and predicate
286 takes ConjectureData objects.
287 """
288 assert predicate is not None or allow_transition is not None
289 self.engine = engine
290 self.__predicate = predicate or (lambda data: True)
291 self.__allow_transition = allow_transition or (lambda source, destination: True)
292 self.__derived_values: dict = {}
293
294 self.initial_size = len(initial.choices)
295 # We keep track of the current best example on the shrink_target
296 # attribute.
297 self.shrink_target = initial
298 self.clear_change_tracking()
299 self.shrinks = 0
300
301 # We terminate shrinks that seem to have reached their logical
302 # conclusion: If we've called the underlying test function at
303 # least self.max_stall times since the last time we shrunk,
304 # it's time to stop shrinking.
305 self.max_stall = 200
306 self.initial_calls = self.engine.call_count
307 self.initial_misaligned = self.engine.misaligned_count
308 self.calls_at_last_shrink = self.initial_calls
309
310 self.shrink_passes: list[ShrinkPass] = [
311 ShrinkPass(self.try_trivial_spans),
312 self.node_program("X" * 5),
313 self.node_program("X" * 4),
314 self.node_program("X" * 3),
315 self.node_program("X" * 2),
316 self.node_program("X" * 1),
317 ShrinkPass(self.pass_to_descendant),
318 ShrinkPass(self.reorder_spans),
319 ShrinkPass(self.minimize_duplicated_choices),
320 ShrinkPass(self.minimize_individual_choices),
321 ShrinkPass(self.redistribute_numeric_pairs),
322 ShrinkPass(self.lower_integers_together),
323 ShrinkPass(self.lower_duplicated_characters),
324 ]
325
326 # Because the shrinker is also used to `pareto_optimise` in the target phase,
327 # we sometimes want to allow extending buffers instead of aborting at the end.
328 self.__extend: Literal["full"] | int = "full" if in_target_phase else 0
329 self.should_explain = explain
330
331 @derived_value # type: ignore
332 def cached_calculations(self):
333 return {}
334
335 def cached(self, *keys):
336 def accept(f):
337 cache_key = (f.__name__, *keys)
338 try:
339 return self.cached_calculations[cache_key]
340 except KeyError:
341 return self.cached_calculations.setdefault(cache_key, f())
342
343 return accept
344
345 @property
346 def calls(self) -> int:
347 """Return the number of calls that have been made to the underlying
348 test function."""
349 return self.engine.call_count
350
351 @property
352 def misaligned(self) -> int:
353 return self.engine.misaligned_count
354
355 def check_calls(self) -> None:
356 if self.calls - self.calls_at_last_shrink >= self.max_stall:
357 raise StopShrinking
358
359 def cached_test_function(
360 self, nodes: Sequence[ChoiceNode]
361 ) -> tuple[bool, ConjectureResult | _Overrun | None]:
362 nodes = nodes[: len(self.nodes)]
363
364 if startswith(nodes, self.nodes):
365 return (True, None)
366
367 if sort_key(self.nodes) < sort_key(nodes):
368 return (False, None)
369
370 # sometimes our shrinking passes try obviously invalid things. We handle
371 # discarding them in one place here.
372 if any(not choice_permitted(node.value, node.constraints) for node in nodes):
373 return (False, None)
374
375 result = self.engine.cached_test_function(
376 [n.value for n in nodes], extend=self.__extend
377 )
378 previous = self.shrink_target
379 self.incorporate_test_data(result)
380 self.check_calls()
381 return (previous is not self.shrink_target, result)
382
383 def consider_new_nodes(self, nodes: Sequence[ChoiceNode]) -> bool:
384 return self.cached_test_function(nodes)[0]
385
386 def incorporate_test_data(self, data):
387 """Takes a ConjectureData or Overrun object updates the current
388 shrink_target if this data represents an improvement over it."""
389 if data.status < Status.VALID or data is self.shrink_target:
390 return
391 if (
392 self.__predicate(data)
393 and sort_key(data.nodes) < sort_key(self.shrink_target.nodes)
394 and self.__allow_transition(self.shrink_target, data)
395 ):
396 self.update_shrink_target(data)
397
398 def debug(self, msg: str) -> None:
399 self.engine.debug(msg)
400
401 @property
402 def random(self) -> "Random":
403 return self.engine.random
404
405 def shrink(self) -> None:
406 """Run the full set of shrinks and update shrink_target.
407
408 This method is "mostly idempotent" - calling it twice is unlikely to
409 have any effect, though it has a non-zero probability of doing so.
410 """
411
412 try:
413 self.initial_coarse_reduction()
414 self.greedy_shrink()
415 except StopShrinking:
416 # If we stopped shrinking because we're making slow progress (instead of
417 # reaching a local optimum), don't run the explain-phase logic.
418 self.should_explain = False
419 finally:
420 if self.engine.report_debug_info:
421
422 def s(n):
423 return "s" if n != 1 else ""
424
425 total_deleted = self.initial_size - len(self.shrink_target.choices)
426 calls = self.engine.call_count - self.initial_calls
427 misaligned = self.engine.misaligned_count - self.initial_misaligned
428
429 self.debug(
430 "---------------------\n"
431 "Shrink pass profiling\n"
432 "---------------------\n\n"
433 f"Shrinking made a total of {calls} call{s(calls)} of which "
434 f"{self.shrinks} shrank and {misaligned} were misaligned. This "
435 f"deleted {total_deleted} choices out of {self.initial_size}."
436 )
437 for useful in [True, False]:
438 self.debug("")
439 if useful:
440 self.debug("Useful passes:")
441 else:
442 self.debug("Useless passes:")
443 self.debug("")
444 for pass_ in sorted(
445 self.shrink_passes,
446 key=lambda t: (-t.calls, t.deletions, t.shrinks),
447 ):
448 if pass_.calls == 0:
449 continue
450 if (pass_.shrinks != 0) != useful:
451 continue
452
453 self.debug(
454 f" * {pass_.name} made {pass_.calls} call{s(pass_.calls)} of which "
455 f"{pass_.shrinks} shrank and {pass_.misaligned} were misaligned, "
456 f"deleting {pass_.deletions} choice{s(pass_.deletions)}."
457 )
458 self.debug("")
459 self.explain()
460
461 def explain(self) -> None:
462
463 if not self.should_explain or not self.shrink_target.arg_slices:
464 return
465
466 self.max_stall = 2**100
467 shrink_target = self.shrink_target
468 nodes = self.nodes
469 choices = self.choices
470 chunks: dict[tuple[int, int], list[tuple[ChoiceT, ...]]] = defaultdict(list)
471
472 # Before we start running experiments, let's check for known inputs which would
473 # make them redundant. The shrinking process means that we've already tried many
474 # variations on the minimal example, so this can save a lot of time.
475 seen_passing_seq = self.engine.passing_choice_sequences(
476 prefix=self.nodes[: min(self.shrink_target.arg_slices)[0]]
477 )
478
479 # Now that we've shrunk to a minimal failing example, it's time to try
480 # varying each part that we've noted will go in the final report. Consider
481 # slices in largest-first order
482 for start, end in sorted(
483 self.shrink_target.arg_slices, key=lambda x: (-(x[1] - x[0]), x)
484 ):
485 # Check for any previous examples that match the prefix and suffix,
486 # so we can skip if we found a passing example while shrinking.
487 if any(
488 startswith(seen, nodes[:start]) and endswith(seen, nodes[end:])
489 for seen in seen_passing_seq
490 ):
491 continue
492
493 # Skip slices that are subsets of already-explained slices.
494 # If a larger slice can vary freely, so can its sub-slices.
495 # Note: (0, 0) is a special marker for the "together" comment that
496 # applies to the whole test, not a specific slice, so we exclude it.
497 if any(
498 s <= start and end <= e
499 for s, e in self.shrink_target.slice_comments
500 if (s, e) != (0, 0)
501 ):
502 continue
503
504 # Run our experiments
505 n_same_failures = 0
506 note = "or any other generated value"
507 # TODO: is 100 same-failures out of 500 attempts a good heuristic?
508 for n_attempt in range(500): # pragma: no branch
509 # no-branch here because we don't coverage-test the abort-at-500 logic.
510
511 if n_attempt - 10 > n_same_failures * 5:
512 # stop early if we're seeing mostly invalid examples
513 break # pragma: no cover
514
515 # replace start:end with random values
516 replacement = []
517 for i in range(start, end):
518 node = nodes[i]
519 if not node.was_forced:
520 value = draw_choice(
521 node.type, node.constraints, random=self.random
522 )
523 node = node.copy(with_value=value)
524 replacement.append(node.value)
525
526 attempt = choices[:start] + tuple(replacement) + choices[end:]
527 result = self.engine.cached_test_function(attempt, extend="full")
528
529 if result.status is Status.OVERRUN:
530 continue # pragma: no cover # flakily covered
531 result = cast(ConjectureResult, result)
532 if not (
533 len(attempt) == len(result.choices)
534 and endswith(result.nodes, nodes[end:])
535 ):
536 # Turns out this was a variable-length part, so grab the infix...
537 for span1, span2 in zip(
538 shrink_target.spans, result.spans, strict=False
539 ):
540 assert span1.start == span2.start
541 assert span1.start <= start
542 if span1.start == start and span1.end == end:
543 result_end = span2.end
544 break
545 else:
546 raise NotImplementedError("Expected matching prefixes")
547
548 attempt = (
549 choices[:start]
550 + result.choices[start:result_end]
551 + choices[end:]
552 )
553 chunks[(start, end)].append(result.choices[start:result_end])
554 result = self.engine.cached_test_function(attempt)
555
556 if result.status is Status.OVERRUN:
557 continue # pragma: no cover # flakily covered
558 result = cast(ConjectureResult, result)
559 else:
560 chunks[(start, end)].append(result.choices[start:end])
561
562 if shrink_target is not self.shrink_target: # pragma: no cover
563 # If we've shrunk further without meaning to, bail out.
564 self.shrink_target.slice_comments.clear()
565 return
566 if result.status is Status.VALID:
567 # The test passed, indicating that this param can't vary freely.
568 # However, it's really hard to write a simple and reliable covering
569 # test, because of our `seen_passing_buffers` check above.
570 break # pragma: no cover
571 if self.__predicate(result): # pragma: no branch
572 n_same_failures += 1
573 if n_same_failures >= 100:
574 self.shrink_target.slice_comments[(start, end)] = note
575 break
576
577 # Finally, if we've found multiple independently-variable parts, check whether
578 # they can all be varied together.
579 if len(self.shrink_target.slice_comments) <= 1:
580 return
581 n_same_failures_together = 0
582 # Only include slices that were actually added to slice_comments
583 chunks_by_start_index = sorted(
584 (k, v) for k, v in chunks.items() if k in self.shrink_target.slice_comments
585 )
586 for _ in range(500): # pragma: no branch
587 # no-branch here because we don't coverage-test the abort-at-500 logic.
588 new_choices: list[ChoiceT] = []
589 prev_end = 0
590 for (start, end), ls in chunks_by_start_index:
591 assert prev_end <= start < end, "these chunks must be nonoverlapping"
592 new_choices.extend(choices[prev_end:start])
593 new_choices.extend(self.random.choice(ls))
594 prev_end = end
595
596 result = self.engine.cached_test_function(new_choices)
597
598 # This *can't* be a shrink because none of the components were.
599 assert shrink_target is self.shrink_target
600 if result.status == Status.VALID:
601 self.shrink_target.slice_comments[(0, 0)] = (
602 "The test sometimes passed when commented parts were varied together."
603 )
604 break # Test passed, this param can't vary freely.
605 if self.__predicate(result): # pragma: no branch
606 n_same_failures_together += 1
607 if n_same_failures_together >= 100:
608 self.shrink_target.slice_comments[(0, 0)] = (
609 "The test always failed when commented parts were varied together."
610 )
611 break
612
613 def greedy_shrink(self) -> None:
614 """Run a full set of greedy shrinks (that is, ones that will only ever
615 move to a better target) and update shrink_target appropriately.
616
617 This method iterates to a fixed point and so is idempontent - calling
618 it twice will have exactly the same effect as calling it once.
619 """
620 self.fixate_shrink_passes(self.shrink_passes)
621
622 def initial_coarse_reduction(self):
623 """Performs some preliminary reductions that should not be
624 repeated as part of the main shrink passes.
625
626 The main reason why these can't be included as part of shrink
627 passes is that they have much more ability to make the test
628 case "worse". e.g. they might rerandomise part of it, significantly
629 increasing the value of individual nodes, which works in direct
630 opposition to the lexical shrinking and will frequently undo
631 its work.
632 """
633 self.reduce_each_alternative()
634
635 @derived_value # type: ignore
636 def spans_starting_at(self):
637 result = [[] for _ in self.shrink_target.nodes]
638 for i, ex in enumerate(self.spans):
639 # We can have zero-length spans that start at the end
640 if ex.start < len(result):
641 result[ex.start].append(i)
642 return tuple(map(tuple, result))
643
644 def reduce_each_alternative(self):
645 """This is a pass that is designed to rerandomise use of the
646 one_of strategy or things that look like it, in order to try
647 to move from later strategies to earlier ones in the branch
648 order.
649
650 It does this by trying to systematically lower each value it
651 finds that looks like it might be the branch decision for
652 one_of, and then attempts to repair any changes in shape that
653 this causes.
654 """
655 i = 0
656 while i < len(self.shrink_target.nodes):
657 nodes = self.shrink_target.nodes
658 node = nodes[i]
659 if (
660 node.type == "integer"
661 and not node.was_forced
662 and node.value <= 10
663 and node.constraints["min_value"] == 0
664 ):
665 assert isinstance(node.value, int)
666
667 # We've found a plausible candidate for a ``one_of`` choice.
668 # We now want to see if the shape of the test case actually depends
669 # on it. If it doesn't, then we don't need to do this (comparatively
670 # costly) pass, and can let much simpler lexicographic reduction
671 # handle it later.
672 #
673 # We test this by trying to set the value to zero and seeing if the
674 # shape changes, as measured by either changing the number of subsequent
675 # nodes, or changing the nodes in such a way as to cause one of the
676 # previous values to no longer be valid in its position.
677 zero_attempt = self.cached_test_function(
678 nodes[:i] + (nodes[i].copy(with_value=0),) + nodes[i + 1 :]
679 )[1]
680 if (
681 zero_attempt is not self.shrink_target
682 and zero_attempt is not None
683 and zero_attempt.status >= Status.VALID
684 ):
685 changed_shape = len(zero_attempt.nodes) != len(nodes)
686
687 if not changed_shape:
688 for j in range(i + 1, len(nodes)):
689 zero_node = zero_attempt.nodes[j]
690 orig_node = nodes[j]
691 if (
692 zero_node.type != orig_node.type
693 or not choice_permitted(
694 orig_node.value, zero_node.constraints
695 )
696 ):
697 changed_shape = True
698 break
699 if changed_shape:
700 for v in range(node.value):
701 if self.try_lower_node_as_alternative(i, v):
702 break
703 i += 1
704
705 def try_lower_node_as_alternative(self, i, v):
706 """Attempt to lower `self.shrink_target.nodes[i]` to `v`,
707 while rerandomising and attempting to repair any subsequent
708 changes to the shape of the test case that this causes."""
709 nodes = self.shrink_target.nodes
710 if self.consider_new_nodes(
711 nodes[:i] + (nodes[i].copy(with_value=v),) + nodes[i + 1 :]
712 ):
713 return True
714
715 prefix = nodes[:i] + (nodes[i].copy(with_value=v),)
716 initial = self.shrink_target
717 spans = self.spans_starting_at[i]
718 for _ in range(3):
719 random_attempt = self.engine.cached_test_function(
720 [n.value for n in prefix], extend=len(nodes)
721 )
722 if random_attempt.status < Status.VALID:
723 continue
724 self.incorporate_test_data(random_attempt)
725 for j in spans:
726 initial_span = initial.spans[j]
727 attempt_span = random_attempt.spans[j]
728 contents = random_attempt.nodes[attempt_span.start : attempt_span.end]
729 self.consider_new_nodes(
730 nodes[:i] + contents + nodes[initial_span.end :]
731 )
732 if initial is not self.shrink_target:
733 return True
734 return False
735
736 @derived_value # type: ignore
737 def shrink_pass_choice_trees(self) -> dict[Any, ChoiceTree]:
738 return defaultdict(ChoiceTree)
739
740 def step(self, shrink_pass: ShrinkPass, *, random_order: bool = False) -> bool:
741 tree = self.shrink_pass_choice_trees[shrink_pass]
742 if tree.exhausted:
743 return False
744
745 initial_shrinks = self.shrinks
746 initial_calls = self.calls
747 initial_misaligned = self.misaligned
748 size = len(self.shrink_target.choices)
749 assert shrink_pass.name is not None
750 self.engine.explain_next_call_as(shrink_pass.name)
751
752 if random_order:
753 selection_order = random_selection_order(self.random)
754 else:
755 selection_order = prefix_selection_order(shrink_pass.last_prefix)
756
757 try:
758 shrink_pass.last_prefix = tree.step(
759 selection_order,
760 lambda chooser: shrink_pass.function(chooser),
761 )
762 finally:
763 shrink_pass.calls += self.calls - initial_calls
764 shrink_pass.misaligned += self.misaligned - initial_misaligned
765 shrink_pass.shrinks += self.shrinks - initial_shrinks
766 shrink_pass.deletions += size - len(self.shrink_target.choices)
767 self.engine.clear_call_explanation()
768 return True
769
770 def fixate_shrink_passes(self, passes: list[ShrinkPass]) -> None:
771 """Run steps from each pass in ``passes`` until the current shrink target
772 is a fixed point of all of them."""
773 any_ran = True
774 while any_ran:
775 any_ran = False
776
777 reordering = {}
778
779 # We run remove_discarded after every pass to do cleanup
780 # keeping track of whether that actually works. Either there is
781 # no discarded data and it is basically free, or it reliably works
782 # and deletes data, or it doesn't work. In that latter case we turn
783 # it off for the rest of this loop through the passes, but will
784 # try again once all of the passes have been run.
785 can_discard = self.remove_discarded()
786
787 calls_at_loop_start = self.calls
788
789 # We keep track of how many calls can be made by a single step
790 # without making progress and use this to test how much to pad
791 # out self.max_stall by as we go along.
792 max_calls_per_failing_step = 1
793
794 for sp in passes:
795 if can_discard:
796 can_discard = self.remove_discarded()
797
798 before_sp = self.shrink_target
799
800 # Run the shrink pass until it fails to make any progress
801 # max_failures times in a row. This implicitly boosts shrink
802 # passes that are more likely to work.
803 failures = 0
804 max_failures = 20
805 while failures < max_failures:
806 # We don't allow more than max_stall consecutive failures
807 # to shrink, but this means that if we're unlucky and the
808 # shrink passes are in a bad order where only the ones at
809 # the end are useful, if we're not careful this heuristic
810 # might stop us before we've tried everything. In order to
811 # avoid that happening, we make sure that there's always
812 # plenty of breathing room to make it through a single
813 # iteration of the fixate_shrink_passes loop.
814 self.max_stall = max(
815 self.max_stall,
816 2 * max_calls_per_failing_step
817 + (self.calls - calls_at_loop_start),
818 )
819
820 prev = self.shrink_target
821 initial_calls = self.calls
822 # It's better for us to run shrink passes in a deterministic
823 # order, to avoid repeat work, but this can cause us to create
824 # long stalls when there are a lot of steps which fail to do
825 # anything useful. In order to avoid this, once we've noticed
826 # we're in a stall (i.e. half of max_failures calls have failed
827 # to do anything) we switch to randomly jumping around. If we
828 # find a success then we'll resume deterministic order from
829 # there which, with any luck, is in a new good region.
830 if not self.step(sp, random_order=failures >= max_failures // 2):
831 # step returns False when there is nothing to do because
832 # the entire choice tree is exhausted. If this happens
833 # we break because we literally can't run this pass any
834 # more than we already have until something else makes
835 # progress.
836 break
837 any_ran = True
838
839 # Don't count steps that didn't actually try to do
840 # anything as failures. Otherwise, this call is a failure
841 # if it failed to make any changes to the shrink target.
842 if initial_calls != self.calls:
843 if prev is not self.shrink_target:
844 failures = 0
845 else:
846 max_calls_per_failing_step = max(
847 max_calls_per_failing_step, self.calls - initial_calls
848 )
849 failures += 1
850
851 # We reorder the shrink passes so that on our next run through
852 # we try good ones first. The rule is that shrink passes that
853 # did nothing useful are the worst, shrink passes that reduced
854 # the length are the best.
855 if self.shrink_target is before_sp:
856 reordering[sp] = 1
857 elif len(self.choices) < len(before_sp.choices):
858 reordering[sp] = -1
859 else:
860 reordering[sp] = 0
861
862 passes.sort(key=reordering.__getitem__)
863
864 @property
865 def nodes(self) -> tuple[ChoiceNode, ...]:
866 return self.shrink_target.nodes
867
868 @property
869 def choices(self) -> tuple[ChoiceT, ...]:
870 return self.shrink_target.choices
871
872 @property
873 def spans(self) -> Spans:
874 return self.shrink_target.spans
875
876 @derived_value # type: ignore
877 def spans_by_label(self):
878 """
879 A mapping of labels to a list of spans with that label. Spans in the list
880 are ordered by their normal index order.
881 """
882
883 spans_by_label = defaultdict(list)
884 for ex in self.spans:
885 spans_by_label[ex.label].append(ex)
886 return dict(spans_by_label)
887
888 @derived_value # type: ignore
889 def distinct_labels(self):
890 return sorted(self.spans_by_label, key=str)
891
892 def pass_to_descendant(self, chooser):
893 """Attempt to replace each span with a descendant span.
894
895 This is designed to deal with strategies that call themselves
896 recursively. For example, suppose we had:
897
898 binary_tree = st.deferred(
899 lambda: st.one_of(
900 st.integers(), st.tuples(binary_tree, binary_tree)))
901
902 This pass guarantees that we can replace any binary tree with one of
903 its subtrees - each of those will create an interval that the parent
904 could validly be replaced with, and this pass will try doing that.
905
906 This is pretty expensive - it takes O(len(intervals)^2) - so we run it
907 late in the process when we've got the number of intervals as far down
908 as possible.
909 """
910
911 label = chooser.choose(
912 self.distinct_labels, lambda l: len(self.spans_by_label[l]) >= 2
913 )
914
915 spans = self.spans_by_label[label]
916 i = chooser.choose(range(len(spans) - 1))
917 ancestor = spans[i]
918
919 if i + 1 == len(spans) or spans[i + 1].start >= ancestor.end:
920 return
921
922 @self.cached(label, i)
923 def descendants():
924 lo = i + 1
925 hi = len(spans)
926 while lo + 1 < hi:
927 mid = (lo + hi) // 2
928 if spans[mid].start >= ancestor.end:
929 hi = mid
930 else:
931 lo = mid
932 return [
933 span
934 for span in spans[i + 1 : hi]
935 if span.choice_count < ancestor.choice_count
936 ]
937
938 descendant = chooser.choose(descendants, lambda ex: ex.choice_count > 0)
939
940 assert ancestor.start <= descendant.start
941 assert ancestor.end >= descendant.end
942 assert descendant.choice_count < ancestor.choice_count
943
944 self.consider_new_nodes(
945 self.nodes[: ancestor.start]
946 + self.nodes[descendant.start : descendant.end]
947 + self.nodes[ancestor.end :]
948 )
949
950 def lower_common_node_offset(self):
951 """Sometimes we find ourselves in a situation where changes to one part
952 of the choice sequence unlock changes to other parts. Sometimes this is
953 good, but sometimes this can cause us to exhibit exponential slow
954 downs!
955
956 e.g. suppose we had the following:
957
958 m = draw(integers(min_value=0))
959 n = draw(integers(min_value=0))
960 assert abs(m - n) > 1
961
962 If this fails then we'll end up with a loop where on each iteration we
963 reduce each of m and n by 2 - m can't go lower because of n, then n
964 can't go lower because of m.
965
966 This will take us O(m) iterations to complete, which is exponential in
967 the data size, as we gradually zig zag our way towards zero.
968
969 This can only happen if we're failing to reduce the size of the choice
970 sequence: The number of iterations that reduce the length of the choice
971 sequence is bounded by that length.
972
973 So what we do is this: We keep track of which nodes are changing, and
974 then if there's some non-zero common offset to them we try and minimize
975 them all at once by lowering that offset.
976
977 This may not work, and it definitely won't get us out of all possible
978 exponential slow downs (an example of where it doesn't is where the
979 shape of the nodes changes as a result of this bouncing behaviour),
980 but it fails fast when it doesn't work and gets us out of a really
981 nastily slow case when it does.
982 """
983 if len(self.__changed_nodes) <= 1:
984 return
985
986 changed = []
987 for i in sorted(self.__changed_nodes):
988 node = self.nodes[i]
989 if node.trivial or node.type != "integer":
990 continue
991 changed.append(node)
992
993 if not changed:
994 return
995
996 ints = [
997 abs(node.value - node.constraints["shrink_towards"]) for node in changed
998 ]
999 offset = min(ints)
1000 assert offset > 0
1001
1002 for i in range(len(ints)):
1003 ints[i] -= offset
1004
1005 st = self.shrink_target
1006
1007 def offset_node(node, n):
1008 return (
1009 node.index,
1010 node.index + 1,
1011 [node.copy(with_value=node.constraints["shrink_towards"] + n)],
1012 )
1013
1014 def consider(n, sign):
1015 return self.consider_new_nodes(
1016 replace_all(
1017 st.nodes,
1018 [
1019 offset_node(node, sign * (n + v))
1020 for node, v in zip(changed, ints, strict=False)
1021 ],
1022 )
1023 )
1024
1025 # shrink from both sides
1026 Integer.shrink(offset, lambda n: consider(n, 1))
1027 Integer.shrink(offset, lambda n: consider(n, -1))
1028 self.clear_change_tracking()
1029
1030 def clear_change_tracking(self):
1031 self.__last_checked_changed_at = self.shrink_target
1032 self.__all_changed_nodes = set()
1033
1034 def mark_changed(self, i):
1035 self.__changed_nodes.add(i)
1036
1037 @property
1038 def __changed_nodes(self) -> set[int]:
1039 if self.__last_checked_changed_at is self.shrink_target:
1040 return self.__all_changed_nodes
1041
1042 prev_target = self.__last_checked_changed_at
1043 new_target = self.shrink_target
1044 assert prev_target is not new_target
1045 prev_nodes = prev_target.nodes
1046 new_nodes = new_target.nodes
1047 assert sort_key(new_target.nodes) < sort_key(prev_target.nodes)
1048
1049 if len(prev_nodes) != len(new_nodes) or any(
1050 n1.type != n2.type for n1, n2 in zip(prev_nodes, new_nodes, strict=True)
1051 ):
1052 # should we check constraints are equal as well?
1053 self.__all_changed_nodes = set()
1054 else:
1055 assert len(prev_nodes) == len(new_nodes)
1056 for i, (n1, n2) in enumerate(zip(prev_nodes, new_nodes, strict=True)):
1057 assert n1.type == n2.type
1058 if not choice_equal(n1.value, n2.value):
1059 self.__all_changed_nodes.add(i)
1060
1061 return self.__all_changed_nodes
1062
1063 def update_shrink_target(self, new_target):
1064 assert isinstance(new_target, ConjectureResult)
1065 self.shrinks += 1
1066 # If we are just taking a long time to shrink we don't want to
1067 # trigger this heuristic, so whenever we shrink successfully
1068 # we give ourselves a bit of breathing room to make sure we
1069 # would find a shrink that took that long to find the next time.
1070 # The case where we're taking a long time but making steady
1071 # progress is handled by `finish_shrinking_deadline` in engine.py
1072 self.max_stall = max(
1073 self.max_stall, (self.calls - self.calls_at_last_shrink) * 2
1074 )
1075 self.calls_at_last_shrink = self.calls
1076 self.shrink_target = new_target
1077 self.__derived_values = {}
1078
1079 def try_shrinking_nodes(self, nodes, n):
1080 """Attempts to replace each node in the nodes list with n. Returns
1081 True if it succeeded (which may include some additional modifications
1082 to shrink_target).
1083
1084 In current usage it is expected that each of the nodes currently have
1085 the same value and choice_type, although this is not essential. Note that
1086 n must be < the node at min(nodes) or this is not a valid shrink.
1087
1088 This method will attempt to do some small amount of work to delete data
1089 that occurs after the end of the nodes. This is useful for cases where
1090 there is some size dependency on the value of a node.
1091 """
1092 # If the length of the shrink target has changed from under us such that
1093 # the indices are out of bounds, give up on the replacement.
1094 # TODO_BETTER_SHRINK: we probably want to narrow down the root cause here at some point.
1095 if any(node.index >= len(self.nodes) for node in nodes):
1096 return # pragma: no cover
1097
1098 initial_attempt = replace_all(
1099 self.nodes,
1100 [(node.index, node.index + 1, [node.copy(with_value=n)]) for node in nodes],
1101 )
1102
1103 attempt = self.cached_test_function(initial_attempt)[1]
1104
1105 if attempt is None:
1106 return False
1107
1108 if attempt is self.shrink_target:
1109 # if the initial shrink was a success, try lowering offsets.
1110 self.lower_common_node_offset()
1111 return True
1112
1113 # If this produced something completely invalid we ditch it
1114 # here rather than trying to persevere.
1115 if attempt.status is Status.OVERRUN:
1116 return False
1117
1118 if attempt.status is Status.INVALID:
1119 return False
1120
1121 if attempt.misaligned_at is not None:
1122 # we're invalid due to a misalignment in the tree. We'll try to fix
1123 # a very specific type of misalignment here: where we have a node of
1124 # {"size": n} and tried to draw the same node, but with {"size": m < n}.
1125 # This can occur with eg
1126 #
1127 # n = data.draw_integer()
1128 # s = data.draw_string(min_size=n)
1129 #
1130 # where we try lowering n, resulting in the test_function drawing a lower
1131 # min_size than our attempt had for the draw_string node.
1132 #
1133 # We'll now try realigning this tree by:
1134 # * replacing the constraints in our attempt with what test_function tried
1135 # to draw in practice
1136 # * truncating the value of that node to match min_size
1137 #
1138 # This helps in the specific case of drawing a value and then drawing
1139 # a collection of that size...and not much else. In practice this
1140 # helps because this antipattern is fairly common.
1141
1142 # TODO we'll probably want to apply the same trick as in the valid
1143 # case of this function of preserving from the right instead of
1144 # preserving from the left. see test_can_shrink_variable_string_draws.
1145
1146 index, attempt_choice_type, attempt_constraints, _attempt_forced = (
1147 attempt.misaligned_at
1148 )
1149 node = self.nodes[index]
1150 if node.type != attempt_choice_type:
1151 return False # pragma: no cover
1152 if node.was_forced:
1153 return False # pragma: no cover
1154
1155 if node.type in {"string", "bytes"}:
1156 # if the size *increased*, we would have to guess what to pad with
1157 # in order to try fixing up this attempt. Just give up.
1158 if node.constraints["min_size"] <= attempt_constraints["min_size"]:
1159 # attempts which increase min_size tend to overrun rather than
1160 # be misaligned, making a covering case difficult.
1161 return False # pragma: no cover
1162 # the size decreased in our attempt. Try again, but truncate the value
1163 # to that size by removing any elements past min_size.
1164 return self.consider_new_nodes(
1165 initial_attempt[: node.index]
1166 + [
1167 initial_attempt[node.index].copy(
1168 with_constraints=attempt_constraints,
1169 with_value=initial_attempt[node.index].value[
1170 : attempt_constraints["min_size"]
1171 ],
1172 )
1173 ]
1174 + initial_attempt[node.index :]
1175 )
1176
1177 lost_nodes = len(self.nodes) - len(attempt.nodes)
1178 if lost_nodes <= 0:
1179 return False
1180
1181 start = nodes[0].index
1182 end = nodes[-1].index + 1
1183 # We now look for contiguous regions to delete that might help fix up
1184 # this failed shrink. We only look for contiguous regions of the right
1185 # lengths because doing anything more than that starts to get very
1186 # expensive. See minimize_individual_choices for where we
1187 # try to be more aggressive.
1188 regions_to_delete = {(end, end + lost_nodes)}
1189
1190 for ex in self.spans:
1191 if ex.start > start:
1192 continue
1193 if ex.end <= end:
1194 continue
1195
1196 if ex.index >= len(attempt.spans):
1197 continue # pragma: no cover
1198
1199 replacement = attempt.spans[ex.index]
1200 in_original = [c for c in ex.children if c.start >= end]
1201 in_replaced = [c for c in replacement.children if c.start >= end]
1202
1203 if len(in_replaced) >= len(in_original) or not in_replaced:
1204 continue
1205
1206 # We've found a span where some of the children went missing
1207 # as a result of this change, and just replacing it with the data
1208 # it would have had and removing the spillover didn't work. This
1209 # means that some of its children towards the right must be
1210 # important, so we try to arrange it so that it retains its
1211 # rightmost children instead of its leftmost.
1212 regions_to_delete.add(
1213 (in_original[0].start, in_original[-len(in_replaced)].start)
1214 )
1215
1216 for u, v in sorted(regions_to_delete, key=lambda x: x[1] - x[0], reverse=True):
1217 try_with_deleted = initial_attempt[:u] + initial_attempt[v:]
1218 if self.consider_new_nodes(try_with_deleted):
1219 return True
1220
1221 return False
1222
1223 def remove_discarded(self):
1224 """Try removing all bytes marked as discarded.
1225
1226 This is primarily to deal with data that has been ignored while
1227 doing rejection sampling - e.g. as a result of an integer range, or a
1228 filtered strategy.
1229
1230 Such data will also be handled by the adaptive_example_deletion pass,
1231 but that pass is necessarily more conservative and will try deleting
1232 each interval individually. The common case is that all data drawn and
1233 rejected can just be thrown away immediately in one block, so this pass
1234 will be much faster than trying each one individually when it works.
1235
1236 returns False if there is discarded data and removing it does not work,
1237 otherwise returns True.
1238 """
1239 while self.shrink_target.has_discards:
1240 discarded = []
1241
1242 for ex in self.shrink_target.spans:
1243 if (
1244 ex.choice_count > 0
1245 and ex.discarded
1246 and (not discarded or ex.start >= discarded[-1][-1])
1247 ):
1248 discarded.append((ex.start, ex.end))
1249
1250 # This can happen if we have discards but they are all of
1251 # zero length. This shouldn't happen very often so it's
1252 # faster to check for it here than at the point of example
1253 # generation.
1254 if not discarded:
1255 break
1256
1257 attempt = list(self.nodes)
1258 for u, v in reversed(discarded):
1259 del attempt[u:v]
1260
1261 if not self.consider_new_nodes(tuple(attempt)):
1262 return False
1263 return True
1264
1265 @derived_value # type: ignore
1266 def duplicated_nodes(self):
1267 """Returns a list of nodes grouped (choice_type, value)."""
1268 duplicates = defaultdict(list)
1269 for node in self.nodes:
1270 duplicates[(node.type, choice_key(node.value))].append(node)
1271 return list(duplicates.values())
1272
1273 def node_program(self, program: str) -> ShrinkPass:
1274 return ShrinkPass(
1275 lambda chooser: self._node_program(chooser, program),
1276 name=f"node_program_{program}",
1277 )
1278
1279 def _node_program(self, chooser, program):
1280 n = len(program)
1281 # Adaptively attempt to run the node program at the current
1282 # index. If this successfully applies the node program ``k`` times
1283 # then this runs in ``O(log(k))`` test function calls.
1284 i = chooser.choose(range(len(self.nodes) - n + 1))
1285
1286 # First, run the node program at the chosen index. If this fails,
1287 # don't do any extra work, so that failure is as cheap as possible.
1288 if not self.run_node_program(i, program, original=self.shrink_target):
1289 return
1290
1291 # Because we run in a random order we will often find ourselves in the middle
1292 # of a region where we could run the node program. We thus start by moving
1293 # left to the beginning of that region if possible in order to start from
1294 # the beginning of that region.
1295 def offset_left(k):
1296 return i - k * n
1297
1298 i = offset_left(
1299 find_integer(
1300 lambda k: self.run_node_program(
1301 offset_left(k), program, original=self.shrink_target
1302 )
1303 )
1304 )
1305
1306 original = self.shrink_target
1307 # Now try to run the node program multiple times here.
1308 find_integer(
1309 lambda k: self.run_node_program(i, program, original=original, repeats=k)
1310 )
1311
1312 def minimize_duplicated_choices(self, chooser):
1313 """Find choices that have been duplicated in multiple places and attempt
1314 to minimize all of the duplicates simultaneously.
1315
1316 This lets us handle cases where two values can't be shrunk
1317 independently of each other but can easily be shrunk together.
1318 For example if we had something like:
1319
1320 ls = data.draw(lists(integers()))
1321 y = data.draw(integers())
1322 assert y not in ls
1323
1324 Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were
1325 to replace both 3s with 0, this would be a valid shrink, but if we were
1326 to replace either 3 with 0 on its own the test would start passing.
1327
1328 It is also useful for when that duplication is accidental and the value
1329 of the choices don't matter very much because it allows us to replace
1330 more values at once.
1331 """
1332 nodes = chooser.choose(self.duplicated_nodes)
1333 # we can't lower any nodes which are trivial. try proceeding with the
1334 # remaining nodes.
1335 nodes = [node for node in nodes if not node.trivial]
1336 if len(nodes) <= 1:
1337 return
1338
1339 self.minimize_nodes(nodes)
1340
1341 def redistribute_numeric_pairs(self, chooser):
1342 """If there is a sum of generated numbers that we need their sum
1343 to exceed some bound, lowering one of them requires raising the
1344 other. This pass enables that."""
1345
1346 # look for a pair of nodes (node1, node2) which are both numeric
1347 # and aren't separated by too many other nodes. We'll decrease node1 and
1348 # increase node2 (note that the other way around doesn't make sense as
1349 # it's strictly worse in the ordering).
1350 def can_choose_node(node):
1351 # don't choose nan, inf, or floats above the threshold where f + 1 > f
1352 # (which is not necessarily true for floats above MAX_PRECISE_INTEGER).
1353 # The motivation for the last condition is to avoid trying weird
1354 # non-shrinks where we raise one node and think we lowered another
1355 # (but didn't).
1356 return node.type in {"integer", "float"} and not (
1357 node.type == "float"
1358 and (math.isnan(node.value) or abs(node.value) >= MAX_PRECISE_INTEGER)
1359 )
1360
1361 node1 = chooser.choose(
1362 self.nodes,
1363 lambda node: can_choose_node(node) and not node.trivial,
1364 )
1365 node2 = chooser.choose(
1366 self.nodes,
1367 lambda node: can_choose_node(node)
1368 # Note that it's fine for node2 to be trivial, because we're going to
1369 # explicitly make it *not* trivial by adding to its value.
1370 and not node.was_forced
1371 # to avoid quadratic behavior, scan ahead only a small amount for
1372 # the related node.
1373 and node1.index < node.index <= node1.index + 4,
1374 )
1375
1376 m: int | float = node1.value
1377 n: int | float = node2.value
1378
1379 def boost(k: int) -> bool:
1380 # floats always shrink towards 0
1381 shrink_towards = (
1382 node1.constraints["shrink_towards"] if node1.type == "integer" else 0
1383 )
1384 if k > abs(m - shrink_towards):
1385 return False
1386
1387 # We are trying to move node1 (m) closer to shrink_towards, and node2
1388 # (n) farther away from shrink_towards. If m is below shrink_towards,
1389 # we want to add to m and subtract from n, and vice versa if above
1390 # shrink_towards.
1391 if m < shrink_towards:
1392 k = -k
1393
1394 try:
1395 v1 = m - k
1396 v2 = n + k
1397 except OverflowError: # pragma: no cover
1398 # if n or m is a float and k is over sys.float_info.max, coercing
1399 # k to a float will overflow.
1400 return False
1401
1402 # if we've increased node2 to the point that we're past max precision,
1403 # give up - things have become too unstable.
1404 if node1.type == "float" and abs(v2) >= MAX_PRECISE_INTEGER:
1405 return False
1406
1407 return self.consider_new_nodes(
1408 self.nodes[: node1.index]
1409 + (node1.copy(with_value=v1),)
1410 + self.nodes[node1.index + 1 : node2.index]
1411 + (node2.copy(with_value=v2),)
1412 + self.nodes[node2.index + 1 :]
1413 )
1414
1415 find_integer(boost)
1416
1417 def lower_integers_together(self, chooser):
1418 node1 = chooser.choose(
1419 self.nodes, lambda n: n.type == "integer" and not n.trivial
1420 )
1421 # Search up to 3 nodes ahead, to avoid quadratic time.
1422 node2 = self.nodes[
1423 chooser.choose(
1424 range(node1.index + 1, min(len(self.nodes), node1.index + 3 + 1)),
1425 lambda i: self.nodes[i].type == "integer"
1426 and not self.nodes[i].was_forced,
1427 )
1428 ]
1429
1430 # one might expect us to require node2 to be nontrivial, and to minimize
1431 # the node which is closer to its shrink_towards, rather than node1
1432 # unconditionally. In reality, it's acceptable for us to transition node2
1433 # from trivial to nontrivial, because the shrink ordering is dominated by
1434 # the complexity of the earlier node1. What matters is minimizing node1.
1435 shrink_towards = node1.constraints["shrink_towards"]
1436
1437 def consider(n):
1438 return self.consider_new_nodes(
1439 self.nodes[: node1.index]
1440 + (node1.copy(with_value=node1.value - n),)
1441 + self.nodes[node1.index + 1 : node2.index]
1442 + (node2.copy(with_value=node2.value - n),)
1443 + self.nodes[node2.index + 1 :]
1444 )
1445
1446 find_integer(lambda n: consider(shrink_towards - n))
1447 find_integer(lambda n: consider(n - shrink_towards))
1448
1449 def lower_duplicated_characters(self, chooser):
1450 """
1451 Select two string choices no more than 4 choices apart and simultaneously
1452 lower characters which appear in both strings. This helps cases where the
1453 same character must appear in two strings, but the actual value of the
1454 character is not relevant.
1455
1456 This shrinking pass currently only tries lowering *all* instances of the
1457 duplicated character in both strings. So for instance, given two choices:
1458
1459 "bbac"
1460 "abbb"
1461
1462 we would try lowering all five of the b characters simultaneously. This
1463 may fail to shrink some cases where only certain character indices are
1464 correlated, for instance if only the b at index 1 could be lowered
1465 simultaneously and the rest did in fact actually have to be a `b`.
1466
1467 It would be nice to try shrinking that case as well, but we would need good
1468 safeguards because it could get very expensive to try all combinations.
1469 I expect lowering all duplicates to handle most cases in the meantime.
1470 """
1471 node1 = chooser.choose(
1472 self.nodes, lambda n: n.type == "string" and not n.trivial
1473 )
1474
1475 # limit search to up to 4 choices ahead, to avoid quadratic behavior
1476 node2 = self.nodes[
1477 chooser.choose(
1478 range(node1.index + 1, min(len(self.nodes), node1.index + 1 + 4)),
1479 lambda i: self.nodes[i].type == "string" and not self.nodes[i].trivial
1480 # select nodes which have at least one of the same character present
1481 and set(node1.value) & set(self.nodes[i].value),
1482 )
1483 ]
1484
1485 duplicated_characters = set(node1.value) & set(node2.value)
1486 # deterministic ordering
1487 char = chooser.choose(sorted(duplicated_characters))
1488 intervals = node1.constraints["intervals"]
1489
1490 def copy_node(node, n):
1491 # replace all duplicate characters in each string. This might miss
1492 # some shrinks compared to only replacing some, but trying all possible
1493 # combinations of indices could get expensive if done without some
1494 # thought.
1495 return node.copy(
1496 with_value=node.value.replace(char, intervals.char_in_shrink_order(n))
1497 )
1498
1499 Integer.shrink(
1500 intervals.index_from_char_in_shrink_order(char),
1501 lambda n: self.consider_new_nodes(
1502 self.nodes[: node1.index]
1503 + (copy_node(node1, n),)
1504 + self.nodes[node1.index + 1 : node2.index]
1505 + (copy_node(node2, n),)
1506 + self.nodes[node2.index + 1 :]
1507 ),
1508 )
1509
1510 def minimize_nodes(self, nodes):
1511 choice_type = nodes[0].type
1512 value = nodes[0].value
1513 # unlike choice_type and value, constraints are *not* guaranteed to be equal among all
1514 # passed nodes. We arbitrarily use the constraints of the first node. I think
1515 # this is unsound (= leads to us trying shrinks that could not have been
1516 # generated), but those get discarded at test-time, and this enables useful
1517 # slips where constraints are not equal but are close enough that doing the
1518 # same operation on both basically just works.
1519 constraints = nodes[0].constraints
1520 assert all(
1521 node.type == choice_type and choice_equal(node.value, value)
1522 for node in nodes
1523 )
1524
1525 if choice_type == "integer":
1526 shrink_towards = constraints["shrink_towards"]
1527 # try shrinking from both sides towards shrink_towards.
1528 # we're starting from n = abs(shrink_towards - value). Because the
1529 # shrinker will not check its starting value, we need to try
1530 # shrinking to n first.
1531 self.try_shrinking_nodes(nodes, abs(shrink_towards - value))
1532 Integer.shrink(
1533 abs(shrink_towards - value),
1534 lambda n: self.try_shrinking_nodes(nodes, shrink_towards + n),
1535 )
1536 Integer.shrink(
1537 abs(shrink_towards - value),
1538 lambda n: self.try_shrinking_nodes(nodes, shrink_towards - n),
1539 )
1540 elif choice_type == "float":
1541 self.try_shrinking_nodes(nodes, abs(value))
1542 Float.shrink(
1543 abs(value),
1544 lambda val: self.try_shrinking_nodes(nodes, val),
1545 )
1546 Float.shrink(
1547 abs(value),
1548 lambda val: self.try_shrinking_nodes(nodes, -val),
1549 )
1550 elif choice_type == "boolean":
1551 # must be True, otherwise would be trivial and not selected.
1552 assert value is True
1553 # only one thing to try: false!
1554 self.try_shrinking_nodes(nodes, False)
1555 elif choice_type == "bytes":
1556 Bytes.shrink(
1557 value,
1558 lambda val: self.try_shrinking_nodes(nodes, val),
1559 min_size=constraints["min_size"],
1560 )
1561 elif choice_type == "string":
1562 String.shrink(
1563 value,
1564 lambda val: self.try_shrinking_nodes(nodes, val),
1565 intervals=constraints["intervals"],
1566 min_size=constraints["min_size"],
1567 )
1568 else:
1569 raise NotImplementedError
1570
1571 def try_trivial_spans(self, chooser):
1572 i = chooser.choose(range(len(self.spans)))
1573
1574 prev = self.shrink_target
1575 nodes = self.shrink_target.nodes
1576 span = self.spans[i]
1577 prefix = nodes[: span.start]
1578 replacement = tuple(
1579 [
1580 (
1581 node
1582 if node.was_forced
1583 else node.copy(
1584 with_value=choice_from_index(0, node.type, node.constraints)
1585 )
1586 )
1587 for node in nodes[span.start : span.end]
1588 ]
1589 )
1590 suffix = nodes[span.end :]
1591 attempt = self.cached_test_function(prefix + replacement + suffix)[1]
1592
1593 if self.shrink_target is not prev:
1594 return
1595
1596 if isinstance(attempt, ConjectureResult):
1597 new_span = attempt.spans[i]
1598 new_replacement = attempt.nodes[new_span.start : new_span.end]
1599 self.consider_new_nodes(prefix + new_replacement + suffix)
1600
1601 def minimize_individual_choices(self, chooser):
1602 """Attempt to minimize each choice in sequence.
1603
1604 This is the pass that ensures that e.g. each integer we draw is a
1605 minimum value. So it's the part that guarantees that if we e.g. do
1606
1607 x = data.draw(integers())
1608 assert x < 10
1609
1610 then in our shrunk example, x = 10 rather than say 97.
1611
1612 If we are unsuccessful at minimizing a choice of interest we then
1613 check if that's because it's changing the size of the test case and,
1614 if so, we also make an attempt to delete parts of the test case to
1615 see if that fixes it.
1616
1617 We handle most of the common cases in try_shrinking_nodes which is
1618 pretty good at clearing out large contiguous blocks of dead space,
1619 but it fails when there is data that has to stay in particular places
1620 in the list.
1621 """
1622 node = chooser.choose(self.nodes, lambda node: not node.trivial)
1623 initial_target = self.shrink_target
1624
1625 self.minimize_nodes([node])
1626 if self.shrink_target is not initial_target:
1627 # the shrink target changed, so our shrink worked. Defer doing
1628 # anything more intelligent until this shrink fails.
1629 return
1630
1631 # the shrink failed. One particularly common case where minimizing a
1632 # node can fail is the antipattern of drawing a size and then drawing a
1633 # collection of that size, or more generally when there is a size
1634 # dependency on some single node. We'll explicitly try and fix up this
1635 # common case here: if decreasing an integer node by one would reduce
1636 # the size of the generated input, we'll try deleting things after that
1637 # node and see if the resulting attempt works.
1638
1639 if node.type != "integer":
1640 # Only try this fixup logic on integer draws. Almost all size
1641 # dependencies are on integer draws, and if it's not, it's doing
1642 # something convoluted enough that it is unlikely to shrink well anyway.
1643 # TODO: extent to floats? we probably currently fail on the following,
1644 # albeit convoluted example:
1645 # n = int(data.draw(st.floats()))
1646 # s = data.draw(st.lists(st.integers(), min_size=n, max_size=n))
1647 return
1648
1649 lowered = (
1650 self.nodes[: node.index]
1651 + (node.copy(with_value=node.value - 1),)
1652 + self.nodes[node.index + 1 :]
1653 )
1654 attempt = self.cached_test_function(lowered)[1]
1655 if (
1656 attempt is None
1657 or attempt.status < Status.VALID
1658 or len(attempt.nodes) == len(self.nodes)
1659 or len(attempt.nodes) == node.index + 1
1660 ):
1661 # no point in trying our size-dependency-logic if our attempt at
1662 # lowering the node resulted in:
1663 # * an invalid conjecture data
1664 # * the same number of nodes as before
1665 # * no nodes beyond the lowered node (nothing to try to delete afterwards)
1666 return
1667
1668 # If it were then the original shrink should have worked and we could
1669 # never have got here.
1670 assert attempt is not self.shrink_target
1671
1672 @self.cached(node.index)
1673 def first_span_after_node():
1674 lo = 0
1675 hi = len(self.spans)
1676 while lo + 1 < hi:
1677 mid = (lo + hi) // 2
1678 span = self.spans[mid]
1679 if span.start >= node.index:
1680 hi = mid
1681 else:
1682 lo = mid
1683 return hi
1684
1685 # we try deleting both entire spans, and single nodes.
1686 # If we wanted to get more aggressive, we could try deleting n
1687 # consecutive nodes (that don't cross a span boundary) for say
1688 # n <= 2 or n <= 3.
1689 if chooser.choose([True, False]):
1690 span = self.spans[
1691 chooser.choose(
1692 range(first_span_after_node, len(self.spans)),
1693 lambda i: self.spans[i].choice_count > 0,
1694 )
1695 ]
1696 self.consider_new_nodes(lowered[: span.start] + lowered[span.end :])
1697 else:
1698 node = self.nodes[chooser.choose(range(node.index + 1, len(self.nodes)))]
1699 self.consider_new_nodes(lowered[: node.index] + lowered[node.index + 1 :])
1700
1701 def reorder_spans(self, chooser):
1702 """This pass allows us to reorder the children of each span.
1703
1704 For example, consider the following:
1705
1706 .. code-block:: python
1707
1708 import hypothesis.strategies as st
1709 from hypothesis import given
1710
1711
1712 @given(st.text(), st.text())
1713 def test_not_equal(x, y):
1714 assert x != y
1715
1716 Without the ability to reorder x and y this could fail either with
1717 ``x=""``, ``y="0"``, or the other way around. With reordering it will
1718 reliably fail with ``x=""``, ``y="0"``.
1719 """
1720 span = chooser.choose(self.spans)
1721
1722 label = chooser.choose(span.children).label
1723 spans = [c for c in span.children if c.label == label]
1724 if len(spans) <= 1:
1725 return
1726
1727 endpoints = [(span.start, span.end) for span in spans]
1728 st = self.shrink_target
1729
1730 Ordering.shrink(
1731 range(len(spans)),
1732 lambda indices: self.consider_new_nodes(
1733 replace_all(
1734 st.nodes,
1735 [
1736 (
1737 u,
1738 v,
1739 st.nodes[spans[i].start : spans[i].end],
1740 )
1741 for (u, v), i in zip(endpoints, indices, strict=True)
1742 ],
1743 )
1744 ),
1745 key=lambda i: sort_key(st.nodes[spans[i].start : spans[i].end]),
1746 )
1747
1748 def run_node_program(self, i, program, original, repeats=1):
1749 """Node programs are a mini-DSL for node rewriting, defined as a sequence
1750 of commands that can be run at some index into the nodes
1751
1752 Commands are:
1753
1754 * "X", delete this node
1755
1756 This method runs the node program in ``program`` at node index
1757 ``i`` on the ConjectureData ``original``. If ``repeats > 1`` then it
1758 will attempt to approximate the results of running it that many times.
1759
1760 Returns True if this successfully changes the underlying shrink target,
1761 else False.
1762 """
1763 if i + len(program) > len(original.nodes) or i < 0:
1764 return False
1765 attempt = list(original.nodes)
1766 for _ in range(repeats):
1767 for k, command in reversed(list(enumerate(program))):
1768 j = i + k
1769 if j >= len(attempt):
1770 return False
1771
1772 if command == "X":
1773 del attempt[j]
1774 else:
1775 raise NotImplementedError(f"Unrecognised command {command!r}")
1776
1777 return self.consider_new_nodes(attempt)