1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import math
12from collections import defaultdict
13from collections.abc import Sequence
14from dataclasses import dataclass
15from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Union, cast
16
17from hypothesis.internal.conjecture.choice import (
18 ChoiceNode,
19 ChoiceT,
20 choice_equal,
21 choice_from_index,
22 choice_key,
23 choice_permitted,
24 choice_to_index,
25)
26from hypothesis.internal.conjecture.data import (
27 ConjectureData,
28 ConjectureResult,
29 Spans,
30 Status,
31 _Overrun,
32 draw_choice,
33)
34from hypothesis.internal.conjecture.junkdrawer import (
35 endswith,
36 find_integer,
37 replace_all,
38 startswith,
39)
40from hypothesis.internal.conjecture.shrinking import (
41 Bytes,
42 Float,
43 Integer,
44 Ordering,
45 String,
46)
47from hypothesis.internal.conjecture.shrinking.choicetree import (
48 ChoiceTree,
49 prefix_selection_order,
50 random_selection_order,
51)
52from hypothesis.internal.floats import MAX_PRECISE_INTEGER
53
54if TYPE_CHECKING:
55 from random import Random
56 from typing import TypeAlias
57
58 from hypothesis.internal.conjecture.engine import ConjectureRunner
59
60ShrinkPredicateT: "TypeAlias" = Callable[[Union[ConjectureResult, _Overrun]], bool]
61
62
63def sort_key(nodes: Sequence[ChoiceNode]) -> tuple[int, tuple[int, ...]]:
64 """Returns a sort key such that "simpler" choice sequences are smaller than
65 "more complicated" ones.
66
67 We define sort_key so that x is simpler than y if x is shorter than y or if
68 they have the same length and map(choice_to_index, x) < map(choice_to_index, y).
69
70 The reason for using this ordering is:
71
72 1. If x is shorter than y then that means we had to make fewer decisions
73 in constructing the test case when we ran x than we did when we ran y.
74 2. If x is the same length as y then replacing a choice with a lower index
75 choice corresponds to replacing it with a simpler/smaller choice.
76 3. Because choices drawn early in generation potentially get used in more
77 places they potentially have a more significant impact on the final
78 result, so it makes sense to prioritise reducing earlier choices over
79 later ones.
80 """
81 return (
82 len(nodes),
83 tuple(choice_to_index(node.value, node.constraints) for node in nodes),
84 )
85
86
87@dataclass
88class ShrinkPass:
89 function: Any
90 name: Optional[str] = None
91 last_prefix: Any = ()
92
93 # some execution statistics
94 calls: int = 0
95 misaligned: int = 0
96 shrinks: int = 0
97 deletions: int = 0
98
99 def __post_init__(self):
100 if self.name is None:
101 self.name = self.function.__name__
102
103 def __hash__(self):
104 return hash(self.name)
105
106
107class StopShrinking(Exception):
108 pass
109
110
111class Shrinker:
112 """A shrinker is a child object of a ConjectureRunner which is designed to
113 manage the associated state of a particular shrink problem. That is, we
114 have some initial ConjectureData object and some property of interest
115 that it satisfies, and we want to find a ConjectureData object with a
116 shortlex (see sort_key above) smaller choice sequence that exhibits the same
117 property.
118
119 Currently the only property of interest we use is that the status is
120 INTERESTING and the interesting_origin takes on some fixed value, but we
121 may potentially be interested in other use cases later.
122 However we assume that data with a status < VALID never satisfies the predicate.
123
124 The shrinker keeps track of a value shrink_target which represents the
125 current best known ConjectureData object satisfying the predicate.
126 It refines this value by repeatedly running *shrink passes*, which are
127 methods that perform a series of transformations to the current shrink_target
128 and evaluate the underlying test function to find new ConjectureData
129 objects. If any of these satisfy the predicate, the shrink_target
130 is updated automatically. Shrinking runs until no shrink pass can
131 improve the shrink_target, at which point it stops. It may also be
132 terminated if the underlying engine throws RunIsComplete, but that
133 is handled by the calling code rather than the Shrinker.
134
135 =======================
136 Designing Shrink Passes
137 =======================
138
139 Generally a shrink pass is just any function that calls
140 cached_test_function and/or consider_new_nodes a number of times,
141 but there are a couple of useful things to bear in mind.
142
143 A shrink pass *makes progress* if running it changes self.shrink_target
144 (i.e. it tries a shortlex smaller ConjectureData object satisfying
145 the predicate). The desired end state of shrinking is to find a
146 value such that no shrink pass can make progress, i.e. that we
147 are at a local minimum for each shrink pass.
148
149 In aid of this goal, the main invariant that a shrink pass much
150 satisfy is that whether it makes progress must be deterministic.
151 It is fine (encouraged even) for the specific progress it makes
152 to be non-deterministic, but if you run a shrink pass, it makes
153 no progress, and then you immediately run it again, it should
154 never succeed on the second time. This allows us to stop as soon
155 as we have run each shrink pass and seen no progress on any of
156 them.
157
158 This means that e.g. it's fine to try each of N deletions
159 or replacements in a random order, but it's not OK to try N random
160 deletions (unless you have already shrunk at least once, though we
161 don't currently take advantage of this loophole).
162
163 Shrink passes need to be written so as to be robust against
164 change in the underlying shrink target. It is generally safe
165 to assume that the shrink target does not change prior to the
166 point of first modification - e.g. if you change no bytes at
167 index ``i``, all spans whose start is ``<= i`` still exist,
168 as do all blocks, and the data object is still of length
169 ``>= i + 1``. This can only be violated by bad user code which
170 relies on an external source of non-determinism.
171
172 When the underlying shrink_target changes, shrink
173 passes should not run substantially more test_function calls
174 on success than they do on failure. Say, no more than a constant
175 factor more. In particular shrink passes should not iterate to a
176 fixed point.
177
178 This means that shrink passes are often written with loops that
179 are carefully designed to do the right thing in the case that no
180 shrinks occurred and try to adapt to any changes to do a reasonable
181 job. e.g. say we wanted to write a shrink pass that tried deleting
182 each individual choice (this isn't an especially good pass,
183 but it leads to a simple illustrative example), we might do it
184 by iterating over the choice sequence like so:
185
186 .. code-block:: python
187
188 i = 0
189 while i < len(self.shrink_target.nodes):
190 if not self.consider_new_nodes(
191 self.shrink_target.nodes[:i] + self.shrink_target.nodes[i + 1 :]
192 ):
193 i += 1
194
195 The reason for writing the loop this way is that i is always a
196 valid index into the current choice sequence, even if the current sequence
197 changes as a result of our actions. When the choice sequence changes,
198 we leave the index where it is rather than restarting from the
199 beginning, and carry on. This means that the number of steps we
200 run in this case is always bounded above by the number of steps
201 we would run if nothing works.
202
203 Another thing to bear in mind about shrink pass design is that
204 they should prioritise *progress*. If you have N operations that
205 you need to run, you should try to order them in such a way as
206 to avoid stalling, where you have long periods of test function
207 invocations where no shrinks happen. This is bad because whenever
208 we shrink we reduce the amount of work the shrinker has to do
209 in future, and often speed up the test function, so we ideally
210 wanted those shrinks to happen much earlier in the process.
211
212 Sometimes stalls are inevitable of course - e.g. if the pass
213 makes no progress, then the entire thing is just one long stall,
214 but it's helpful to design it so that stalls are less likely
215 in typical behaviour.
216
217 The two easiest ways to do this are:
218
219 * Just run the N steps in random order. As long as a
220 reasonably large proportion of the operations succeed, this
221 guarantees the expected stall length is quite short. The
222 book keeping for making sure this does the right thing when
223 it succeeds can be quite annoying.
224 * When you have any sort of nested loop, loop in such a way
225 that both loop variables change each time. This prevents
226 stalls which occur when one particular value for the outer
227 loop is impossible to make progress on, rendering the entire
228 inner loop into a stall.
229
230 However, although progress is good, too much progress can be
231 a bad sign! If you're *only* seeing successful reductions,
232 that's probably a sign that you are making changes that are
233 too timid. Two useful things to offset this:
234
235 * It's worth writing shrink passes which are *adaptive*, in
236 the sense that when operations seem to be working really
237 well we try to bundle multiple of them together. This can
238 often be used to turn what would be O(m) successful calls
239 into O(log(m)).
240 * It's often worth trying one or two special minimal values
241 before trying anything more fine grained (e.g. replacing
242 the whole thing with zero).
243
244 """
245
246 def derived_value(fn):
247 """It's useful during shrinking to have access to derived values of
248 the current shrink target.
249
250 This decorator allows you to define these as cached properties. They
251 are calculated once, then cached until the shrink target changes, then
252 recalculated the next time they are used."""
253
254 def accept(self):
255 try:
256 return self.__derived_values[fn.__name__]
257 except KeyError:
258 return self.__derived_values.setdefault(fn.__name__, fn(self))
259
260 accept.__name__ = fn.__name__
261 return property(accept)
262
263 def __init__(
264 self,
265 engine: "ConjectureRunner",
266 initial: Union[ConjectureData, ConjectureResult],
267 predicate: Optional[ShrinkPredicateT],
268 *,
269 allow_transition: Optional[
270 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool]
271 ],
272 explain: bool,
273 in_target_phase: bool = False,
274 ):
275 """Create a shrinker for a particular engine, with a given starting
276 point and predicate. When shrink() is called it will attempt to find an
277 example for which predicate is True and which is strictly smaller than
278 initial.
279
280 Note that initial is a ConjectureData object, and predicate
281 takes ConjectureData objects.
282 """
283 assert predicate is not None or allow_transition is not None
284 self.engine = engine
285 self.__predicate = predicate or (lambda data: True)
286 self.__allow_transition = allow_transition or (lambda source, destination: True)
287 self.__derived_values: dict = {}
288
289 self.initial_size = len(initial.choices)
290 # We keep track of the current best example on the shrink_target
291 # attribute.
292 self.shrink_target = initial
293 self.clear_change_tracking()
294 self.shrinks = 0
295
296 # We terminate shrinks that seem to have reached their logical
297 # conclusion: If we've called the underlying test function at
298 # least self.max_stall times since the last time we shrunk,
299 # it's time to stop shrinking.
300 self.max_stall = 200
301 self.initial_calls = self.engine.call_count
302 self.initial_misaligned = self.engine.misaligned_count
303 self.calls_at_last_shrink = self.initial_calls
304
305 self.shrink_passes: list[ShrinkPass] = [
306 ShrinkPass(self.try_trivial_spans),
307 self.node_program("X" * 5),
308 self.node_program("X" * 4),
309 self.node_program("X" * 3),
310 self.node_program("X" * 2),
311 self.node_program("X" * 1),
312 ShrinkPass(self.pass_to_descendant),
313 ShrinkPass(self.reorder_spans),
314 ShrinkPass(self.minimize_duplicated_choices),
315 ShrinkPass(self.minimize_individual_choices),
316 ShrinkPass(self.redistribute_numeric_pairs),
317 ShrinkPass(self.lower_integers_together),
318 ShrinkPass(self.lower_duplicated_characters),
319 ]
320
321 # Because the shrinker is also used to `pareto_optimise` in the target phase,
322 # we sometimes want to allow extending buffers instead of aborting at the end.
323 self.__extend: Union[Literal["full"], int] = "full" if in_target_phase else 0
324 self.should_explain = explain
325
326 @derived_value # type: ignore
327 def cached_calculations(self):
328 return {}
329
330 def cached(self, *keys):
331 def accept(f):
332 cache_key = (f.__name__, *keys)
333 try:
334 return self.cached_calculations[cache_key]
335 except KeyError:
336 return self.cached_calculations.setdefault(cache_key, f())
337
338 return accept
339
340 @property
341 def calls(self) -> int:
342 """Return the number of calls that have been made to the underlying
343 test function."""
344 return self.engine.call_count
345
346 @property
347 def misaligned(self) -> int:
348 return self.engine.misaligned_count
349
350 def check_calls(self) -> None:
351 if self.calls - self.calls_at_last_shrink >= self.max_stall:
352 raise StopShrinking
353
354 def cached_test_function(
355 self, nodes: Sequence[ChoiceNode]
356 ) -> tuple[bool, Optional[Union[ConjectureResult, _Overrun]]]:
357 nodes = nodes[: len(self.nodes)]
358
359 if startswith(nodes, self.nodes):
360 return (True, None)
361
362 if sort_key(self.nodes) < sort_key(nodes):
363 return (False, None)
364
365 # sometimes our shrinking passes try obviously invalid things. We handle
366 # discarding them in one place here.
367 if any(not choice_permitted(node.value, node.constraints) for node in nodes):
368 return (False, None)
369
370 result = self.engine.cached_test_function(
371 [n.value for n in nodes], extend=self.__extend
372 )
373 previous = self.shrink_target
374 self.incorporate_test_data(result)
375 self.check_calls()
376 return (previous is not self.shrink_target, result)
377
378 def consider_new_nodes(self, nodes: Sequence[ChoiceNode]) -> bool:
379 return self.cached_test_function(nodes)[0]
380
381 def incorporate_test_data(self, data):
382 """Takes a ConjectureData or Overrun object updates the current
383 shrink_target if this data represents an improvement over it."""
384 if data.status < Status.VALID or data is self.shrink_target:
385 return
386 if (
387 self.__predicate(data)
388 and sort_key(data.nodes) < sort_key(self.shrink_target.nodes)
389 and self.__allow_transition(self.shrink_target, data)
390 ):
391 self.update_shrink_target(data)
392
393 def debug(self, msg: str) -> None:
394 self.engine.debug(msg)
395
396 @property
397 def random(self) -> "Random":
398 return self.engine.random
399
400 def shrink(self) -> None:
401 """Run the full set of shrinks and update shrink_target.
402
403 This method is "mostly idempotent" - calling it twice is unlikely to
404 have any effect, though it has a non-zero probability of doing so.
405 """
406
407 try:
408 self.initial_coarse_reduction()
409 self.greedy_shrink()
410 except StopShrinking:
411 # If we stopped shrinking because we're making slow progress (instead of
412 # reaching a local optimum), don't run the explain-phase logic.
413 self.should_explain = False
414 finally:
415 if self.engine.report_debug_info:
416
417 def s(n):
418 return "s" if n != 1 else ""
419
420 total_deleted = self.initial_size - len(self.shrink_target.choices)
421 calls = self.engine.call_count - self.initial_calls
422 misaligned = self.engine.misaligned_count - self.initial_misaligned
423
424 self.debug(
425 "---------------------\n"
426 "Shrink pass profiling\n"
427 "---------------------\n\n"
428 f"Shrinking made a total of {calls} call{s(calls)} of which "
429 f"{self.shrinks} shrank and {misaligned} were misaligned. This "
430 f"deleted {total_deleted} choices out of {self.initial_size}."
431 )
432 for useful in [True, False]:
433 self.debug("")
434 if useful:
435 self.debug("Useful passes:")
436 else:
437 self.debug("Useless passes:")
438 self.debug("")
439 for pass_ in sorted(
440 self.shrink_passes,
441 key=lambda t: (-t.calls, t.deletions, t.shrinks),
442 ):
443 if pass_.calls == 0:
444 continue
445 if (pass_.shrinks != 0) != useful:
446 continue
447
448 self.debug(
449 f" * {pass_.name} made {pass_.calls} call{s(pass_.calls)} of which "
450 f"{pass_.shrinks} shrank and {pass_.misaligned} were misaligned, "
451 f"deleting {pass_.deletions} choice{s(pass_.deletions)}."
452 )
453 self.debug("")
454 self.explain()
455
456 def explain(self) -> None:
457
458 if not self.should_explain or not self.shrink_target.arg_slices:
459 return
460
461 self.max_stall = 2**100
462 shrink_target = self.shrink_target
463 nodes = self.nodes
464 choices = self.choices
465 chunks: dict[tuple[int, int], list[tuple[ChoiceT, ...]]] = defaultdict(list)
466
467 # Before we start running experiments, let's check for known inputs which would
468 # make them redundant. The shrinking process means that we've already tried many
469 # variations on the minimal example, so this can save a lot of time.
470 seen_passing_seq = self.engine.passing_choice_sequences(
471 prefix=self.nodes[: min(self.shrink_target.arg_slices)[0]]
472 )
473
474 # Now that we've shrunk to a minimal failing example, it's time to try
475 # varying each part that we've noted will go in the final report. Consider
476 # slices in largest-first order
477 for start, end in sorted(
478 self.shrink_target.arg_slices, key=lambda x: (-(x[1] - x[0]), x)
479 ):
480 # Check for any previous examples that match the prefix and suffix,
481 # so we can skip if we found a passing example while shrinking.
482 if any(
483 startswith(seen, nodes[:start]) and endswith(seen, nodes[end:])
484 for seen in seen_passing_seq
485 ):
486 continue
487
488 # Run our experiments
489 n_same_failures = 0
490 note = "or any other generated value"
491 # TODO: is 100 same-failures out of 500 attempts a good heuristic?
492 for n_attempt in range(500): # pragma: no branch
493 # no-branch here because we don't coverage-test the abort-at-500 logic.
494
495 if n_attempt - 10 > n_same_failures * 5:
496 # stop early if we're seeing mostly invalid examples
497 break # pragma: no cover
498
499 # replace start:end with random values
500 replacement = []
501 for i in range(start, end):
502 node = nodes[i]
503 if not node.was_forced:
504 value = draw_choice(
505 node.type, node.constraints, random=self.random
506 )
507 node = node.copy(with_value=value)
508 replacement.append(node.value)
509
510 attempt = choices[:start] + tuple(replacement) + choices[end:]
511 result = self.engine.cached_test_function(attempt, extend="full")
512
513 if result.status is Status.OVERRUN:
514 continue # pragma: no cover # flakily covered
515 result = cast(ConjectureResult, result)
516 if not (
517 len(attempt) == len(result.choices)
518 and endswith(result.nodes, nodes[end:])
519 ):
520 # Turns out this was a variable-length part, so grab the infix...
521 for span1, span2 in zip(shrink_target.spans, result.spans):
522 assert span1.start == span2.start
523 assert span1.start <= start
524 assert span1.label == span2.label
525 if span1.start == start and span1.end == end:
526 result_end = span2.end
527 break
528 else:
529 raise NotImplementedError("Expected matching prefixes")
530
531 attempt = (
532 choices[:start]
533 + result.choices[start:result_end]
534 + choices[end:]
535 )
536 chunks[(start, end)].append(result.choices[start:result_end])
537 result = self.engine.cached_test_function(attempt)
538
539 if result.status is Status.OVERRUN:
540 continue # pragma: no cover # flakily covered
541 result = cast(ConjectureResult, result)
542 else:
543 chunks[(start, end)].append(result.choices[start:end])
544
545 if shrink_target is not self.shrink_target: # pragma: no cover
546 # If we've shrunk further without meaning to, bail out.
547 self.shrink_target.slice_comments.clear()
548 return
549 if result.status is Status.VALID:
550 # The test passed, indicating that this param can't vary freely.
551 # However, it's really hard to write a simple and reliable covering
552 # test, because of our `seen_passing_buffers` check above.
553 break # pragma: no cover
554 elif self.__predicate(result): # pragma: no branch
555 n_same_failures += 1
556 if n_same_failures >= 100:
557 self.shrink_target.slice_comments[(start, end)] = note
558 break
559
560 # Finally, if we've found multiple independently-variable parts, check whether
561 # they can all be varied together.
562 if len(self.shrink_target.slice_comments) <= 1:
563 return
564 n_same_failures_together = 0
565 chunks_by_start_index = sorted(chunks.items())
566 for _ in range(500): # pragma: no branch
567 # no-branch here because we don't coverage-test the abort-at-500 logic.
568 new_choices: list[ChoiceT] = []
569 prev_end = 0
570 for (start, end), ls in chunks_by_start_index:
571 assert prev_end <= start < end, "these chunks must be nonoverlapping"
572 new_choices.extend(choices[prev_end:start])
573 new_choices.extend(self.random.choice(ls))
574 prev_end = end
575
576 result = self.engine.cached_test_function(new_choices)
577
578 # This *can't* be a shrink because none of the components were.
579 assert shrink_target is self.shrink_target
580 if result.status == Status.VALID:
581 self.shrink_target.slice_comments[(0, 0)] = (
582 "The test sometimes passed when commented parts were varied together."
583 )
584 break # Test passed, this param can't vary freely.
585 elif self.__predicate(result): # pragma: no branch
586 n_same_failures_together += 1
587 if n_same_failures_together >= 100:
588 self.shrink_target.slice_comments[(0, 0)] = (
589 "The test always failed when commented parts were varied together."
590 )
591 break
592
593 def greedy_shrink(self) -> None:
594 """Run a full set of greedy shrinks (that is, ones that will only ever
595 move to a better target) and update shrink_target appropriately.
596
597 This method iterates to a fixed point and so is idempontent - calling
598 it twice will have exactly the same effect as calling it once.
599 """
600 self.fixate_shrink_passes(self.shrink_passes)
601
602 def initial_coarse_reduction(self):
603 """Performs some preliminary reductions that should not be
604 repeated as part of the main shrink passes.
605
606 The main reason why these can't be included as part of shrink
607 passes is that they have much more ability to make the test
608 case "worse". e.g. they might rerandomise part of it, significantly
609 increasing the value of individual nodes, which works in direct
610 opposition to the lexical shrinking and will frequently undo
611 its work.
612 """
613 self.reduce_each_alternative()
614
615 @derived_value # type: ignore
616 def spans_starting_at(self):
617 result = [[] for _ in self.shrink_target.nodes]
618 for i, ex in enumerate(self.spans):
619 # We can have zero-length spans that start at the end
620 if ex.start < len(result):
621 result[ex.start].append(i)
622 return tuple(map(tuple, result))
623
624 def reduce_each_alternative(self):
625 """This is a pass that is designed to rerandomise use of the
626 one_of strategy or things that look like it, in order to try
627 to move from later strategies to earlier ones in the branch
628 order.
629
630 It does this by trying to systematically lower each value it
631 finds that looks like it might be the branch decision for
632 one_of, and then attempts to repair any changes in shape that
633 this causes.
634 """
635 i = 0
636 while i < len(self.shrink_target.nodes):
637 nodes = self.shrink_target.nodes
638 node = nodes[i]
639 if (
640 node.type == "integer"
641 and not node.was_forced
642 and node.value <= 10
643 and node.constraints["min_value"] == 0
644 ):
645 assert isinstance(node.value, int)
646
647 # We've found a plausible candidate for a ``one_of`` choice.
648 # We now want to see if the shape of the test case actually depends
649 # on it. If it doesn't, then we don't need to do this (comparatively
650 # costly) pass, and can let much simpler lexicographic reduction
651 # handle it later.
652 #
653 # We test this by trying to set the value to zero and seeing if the
654 # shape changes, as measured by either changing the number of subsequent
655 # nodes, or changing the nodes in such a way as to cause one of the
656 # previous values to no longer be valid in its position.
657 zero_attempt = self.cached_test_function(
658 nodes[:i] + (nodes[i].copy(with_value=0),) + nodes[i + 1 :]
659 )[1]
660 if (
661 zero_attempt is not self.shrink_target
662 and zero_attempt is not None
663 and zero_attempt.status >= Status.VALID
664 ):
665 changed_shape = len(zero_attempt.nodes) != len(nodes)
666
667 if not changed_shape:
668 for j in range(i + 1, len(nodes)):
669 zero_node = zero_attempt.nodes[j]
670 orig_node = nodes[j]
671 if (
672 zero_node.type != orig_node.type
673 or not choice_permitted(
674 orig_node.value, zero_node.constraints
675 )
676 ):
677 changed_shape = True
678 break
679 if changed_shape:
680 for v in range(node.value):
681 if self.try_lower_node_as_alternative(i, v):
682 break
683 i += 1
684
685 def try_lower_node_as_alternative(self, i, v):
686 """Attempt to lower `self.shrink_target.nodes[i]` to `v`,
687 while rerandomising and attempting to repair any subsequent
688 changes to the shape of the test case that this causes."""
689 nodes = self.shrink_target.nodes
690 if self.consider_new_nodes(
691 nodes[:i] + (nodes[i].copy(with_value=v),) + nodes[i + 1 :]
692 ):
693 return True
694
695 prefix = nodes[:i] + (nodes[i].copy(with_value=v),)
696 initial = self.shrink_target
697 spans = self.spans_starting_at[i]
698 for _ in range(3):
699 random_attempt = self.engine.cached_test_function(
700 [n.value for n in prefix], extend=len(nodes)
701 )
702 if random_attempt.status < Status.VALID:
703 continue
704 self.incorporate_test_data(random_attempt)
705 for j in spans:
706 initial_ex = initial.spans[j]
707 attempt_ex = random_attempt.spans[j]
708 contents = random_attempt.nodes[attempt_ex.start : attempt_ex.end]
709 self.consider_new_nodes(nodes[:i] + contents + nodes[initial_ex.end :])
710 if initial is not self.shrink_target:
711 return True
712 return False
713
714 @derived_value # type: ignore
715 def shrink_pass_choice_trees(self) -> dict[Any, ChoiceTree]:
716 return defaultdict(ChoiceTree)
717
718 def step(self, shrink_pass: ShrinkPass, *, random_order: bool = False) -> bool:
719 tree = self.shrink_pass_choice_trees[shrink_pass]
720 if tree.exhausted:
721 return False
722
723 initial_shrinks = self.shrinks
724 initial_calls = self.calls
725 initial_misaligned = self.misaligned
726 size = len(self.shrink_target.choices)
727 assert shrink_pass.name is not None
728 self.engine.explain_next_call_as(shrink_pass.name)
729
730 if random_order:
731 selection_order = random_selection_order(self.random)
732 else:
733 selection_order = prefix_selection_order(shrink_pass.last_prefix)
734
735 try:
736 shrink_pass.last_prefix = tree.step(
737 selection_order,
738 lambda chooser: shrink_pass.function(chooser),
739 )
740 finally:
741 shrink_pass.calls += self.calls - initial_calls
742 shrink_pass.misaligned += self.misaligned - initial_misaligned
743 shrink_pass.shrinks += self.shrinks - initial_shrinks
744 shrink_pass.deletions += size - len(self.shrink_target.choices)
745 self.engine.clear_call_explanation()
746 return True
747
748 def fixate_shrink_passes(self, passes: list[ShrinkPass]) -> None:
749 """Run steps from each pass in ``passes`` until the current shrink target
750 is a fixed point of all of them."""
751 any_ran = True
752 while any_ran:
753 any_ran = False
754
755 reordering = {}
756
757 # We run remove_discarded after every pass to do cleanup
758 # keeping track of whether that actually works. Either there is
759 # no discarded data and it is basically free, or it reliably works
760 # and deletes data, or it doesn't work. In that latter case we turn
761 # it off for the rest of this loop through the passes, but will
762 # try again once all of the passes have been run.
763 can_discard = self.remove_discarded()
764
765 calls_at_loop_start = self.calls
766
767 # We keep track of how many calls can be made by a single step
768 # without making progress and use this to test how much to pad
769 # out self.max_stall by as we go along.
770 max_calls_per_failing_step = 1
771
772 for sp in passes:
773 if can_discard:
774 can_discard = self.remove_discarded()
775
776 before_sp = self.shrink_target
777
778 # Run the shrink pass until it fails to make any progress
779 # max_failures times in a row. This implicitly boosts shrink
780 # passes that are more likely to work.
781 failures = 0
782 max_failures = 20
783 while failures < max_failures:
784 # We don't allow more than max_stall consecutive failures
785 # to shrink, but this means that if we're unlucky and the
786 # shrink passes are in a bad order where only the ones at
787 # the end are useful, if we're not careful this heuristic
788 # might stop us before we've tried everything. In order to
789 # avoid that happening, we make sure that there's always
790 # plenty of breathing room to make it through a single
791 # iteration of the fixate_shrink_passes loop.
792 self.max_stall = max(
793 self.max_stall,
794 2 * max_calls_per_failing_step
795 + (self.calls - calls_at_loop_start),
796 )
797
798 prev = self.shrink_target
799 initial_calls = self.calls
800 # It's better for us to run shrink passes in a deterministic
801 # order, to avoid repeat work, but this can cause us to create
802 # long stalls when there are a lot of steps which fail to do
803 # anything useful. In order to avoid this, once we've noticed
804 # we're in a stall (i.e. half of max_failures calls have failed
805 # to do anything) we switch to randomly jumping around. If we
806 # find a success then we'll resume deterministic order from
807 # there which, with any luck, is in a new good region.
808 if not self.step(sp, random_order=failures >= max_failures // 2):
809 # step returns False when there is nothing to do because
810 # the entire choice tree is exhausted. If this happens
811 # we break because we literally can't run this pass any
812 # more than we already have until something else makes
813 # progress.
814 break
815 any_ran = True
816
817 # Don't count steps that didn't actually try to do
818 # anything as failures. Otherwise, this call is a failure
819 # if it failed to make any changes to the shrink target.
820 if initial_calls != self.calls:
821 if prev is not self.shrink_target:
822 failures = 0
823 else:
824 max_calls_per_failing_step = max(
825 max_calls_per_failing_step, self.calls - initial_calls
826 )
827 failures += 1
828
829 # We reorder the shrink passes so that on our next run through
830 # we try good ones first. The rule is that shrink passes that
831 # did nothing useful are the worst, shrink passes that reduced
832 # the length are the best.
833 if self.shrink_target is before_sp:
834 reordering[sp] = 1
835 elif len(self.choices) < len(before_sp.choices):
836 reordering[sp] = -1
837 else:
838 reordering[sp] = 0
839
840 passes.sort(key=reordering.__getitem__)
841
842 @property
843 def nodes(self) -> tuple[ChoiceNode, ...]:
844 return self.shrink_target.nodes
845
846 @property
847 def choices(self) -> tuple[ChoiceT, ...]:
848 return self.shrink_target.choices
849
850 @property
851 def spans(self) -> Spans:
852 return self.shrink_target.spans
853
854 @derived_value # type: ignore
855 def spans_by_label(self):
856 """
857 A mapping of labels to a list of spans with that label. Spans in the list
858 are ordered by their normal index order.
859 """
860
861 spans_by_label = defaultdict(list)
862 for ex in self.spans:
863 spans_by_label[ex.label].append(ex)
864 return dict(spans_by_label)
865
866 @derived_value # type: ignore
867 def distinct_labels(self):
868 return sorted(self.spans_by_label, key=str)
869
870 def pass_to_descendant(self, chooser):
871 """Attempt to replace each span with a descendant span.
872
873 This is designed to deal with strategies that call themselves
874 recursively. For example, suppose we had:
875
876 binary_tree = st.deferred(
877 lambda: st.one_of(
878 st.integers(), st.tuples(binary_tree, binary_tree)))
879
880 This pass guarantees that we can replace any binary tree with one of
881 its subtrees - each of those will create an interval that the parent
882 could validly be replaced with, and this pass will try doing that.
883
884 This is pretty expensive - it takes O(len(intervals)^2) - so we run it
885 late in the process when we've got the number of intervals as far down
886 as possible.
887 """
888
889 label = chooser.choose(
890 self.distinct_labels, lambda l: len(self.spans_by_label[l]) >= 2
891 )
892
893 ls = self.spans_by_label[label]
894 i = chooser.choose(range(len(ls) - 1))
895 ancestor = ls[i]
896
897 if i + 1 == len(ls) or ls[i + 1].start >= ancestor.end:
898 return
899
900 @self.cached(label, i)
901 def descendants():
902 lo = i + 1
903 hi = len(ls)
904 while lo + 1 < hi:
905 mid = (lo + hi) // 2
906 if ls[mid].start >= ancestor.end:
907 hi = mid
908 else:
909 lo = mid
910 return [t for t in ls[i + 1 : hi] if t.choice_count < ancestor.choice_count]
911
912 descendant = chooser.choose(descendants, lambda ex: ex.choice_count > 0)
913
914 assert ancestor.start <= descendant.start
915 assert ancestor.end >= descendant.end
916 assert descendant.choice_count < ancestor.choice_count
917
918 self.consider_new_nodes(
919 self.nodes[: ancestor.start]
920 + self.nodes[descendant.start : descendant.end]
921 + self.nodes[ancestor.end :]
922 )
923
924 def lower_common_node_offset(self):
925 """Sometimes we find ourselves in a situation where changes to one part
926 of the choice sequence unlock changes to other parts. Sometimes this is
927 good, but sometimes this can cause us to exhibit exponential slow
928 downs!
929
930 e.g. suppose we had the following:
931
932 m = draw(integers(min_value=0))
933 n = draw(integers(min_value=0))
934 assert abs(m - n) > 1
935
936 If this fails then we'll end up with a loop where on each iteration we
937 reduce each of m and n by 2 - m can't go lower because of n, then n
938 can't go lower because of m.
939
940 This will take us O(m) iterations to complete, which is exponential in
941 the data size, as we gradually zig zag our way towards zero.
942
943 This can only happen if we're failing to reduce the size of the choice
944 sequence: The number of iterations that reduce the length of the choice
945 sequence is bounded by that length.
946
947 So what we do is this: We keep track of which nodes are changing, and
948 then if there's some non-zero common offset to them we try and minimize
949 them all at once by lowering that offset.
950
951 This may not work, and it definitely won't get us out of all possible
952 exponential slow downs (an example of where it doesn't is where the
953 shape of the nodes changes as a result of this bouncing behaviour),
954 but it fails fast when it doesn't work and gets us out of a really
955 nastily slow case when it does.
956 """
957 if len(self.__changed_nodes) <= 1:
958 return
959
960 changed = []
961 for i in sorted(self.__changed_nodes):
962 node = self.nodes[i]
963 if node.trivial or node.type != "integer":
964 continue
965 changed.append(node)
966
967 if not changed:
968 return
969
970 ints = [
971 abs(node.value - node.constraints["shrink_towards"]) for node in changed
972 ]
973 offset = min(ints)
974 assert offset > 0
975
976 for i in range(len(ints)):
977 ints[i] -= offset
978
979 st = self.shrink_target
980
981 def offset_node(node, n):
982 return (
983 node.index,
984 node.index + 1,
985 [node.copy(with_value=node.constraints["shrink_towards"] + n)],
986 )
987
988 def consider(n, sign):
989 return self.consider_new_nodes(
990 replace_all(
991 st.nodes,
992 [
993 offset_node(node, sign * (n + v))
994 for node, v in zip(changed, ints)
995 ],
996 )
997 )
998
999 # shrink from both sides
1000 Integer.shrink(offset, lambda n: consider(n, 1))
1001 Integer.shrink(offset, lambda n: consider(n, -1))
1002 self.clear_change_tracking()
1003
1004 def clear_change_tracking(self):
1005 self.__last_checked_changed_at = self.shrink_target
1006 self.__all_changed_nodes = set()
1007
1008 def mark_changed(self, i):
1009 self.__changed_nodes.add(i)
1010
1011 @property
1012 def __changed_nodes(self) -> set[int]:
1013 if self.__last_checked_changed_at is self.shrink_target:
1014 return self.__all_changed_nodes
1015
1016 prev_target = self.__last_checked_changed_at
1017 new_target = self.shrink_target
1018 assert prev_target is not new_target
1019 prev_nodes = prev_target.nodes
1020 new_nodes = new_target.nodes
1021 assert sort_key(new_target.nodes) < sort_key(prev_target.nodes)
1022
1023 if len(prev_nodes) != len(new_nodes) or any(
1024 n1.type != n2.type for n1, n2 in zip(prev_nodes, new_nodes)
1025 ):
1026 # should we check constraints are equal as well?
1027 self.__all_changed_nodes = set()
1028 else:
1029 assert len(prev_nodes) == len(new_nodes)
1030 for i, (n1, n2) in enumerate(zip(prev_nodes, new_nodes)):
1031 assert n1.type == n2.type
1032 if not choice_equal(n1.value, n2.value):
1033 self.__all_changed_nodes.add(i)
1034
1035 return self.__all_changed_nodes
1036
1037 def update_shrink_target(self, new_target):
1038 assert isinstance(new_target, ConjectureResult)
1039 self.shrinks += 1
1040 # If we are just taking a long time to shrink we don't want to
1041 # trigger this heuristic, so whenever we shrink successfully
1042 # we give ourselves a bit of breathing room to make sure we
1043 # would find a shrink that took that long to find the next time.
1044 # The case where we're taking a long time but making steady
1045 # progress is handled by `finish_shrinking_deadline` in engine.py
1046 self.max_stall = max(
1047 self.max_stall, (self.calls - self.calls_at_last_shrink) * 2
1048 )
1049 self.calls_at_last_shrink = self.calls
1050 self.shrink_target = new_target
1051 self.__derived_values = {}
1052
1053 def try_shrinking_nodes(self, nodes, n):
1054 """Attempts to replace each node in the nodes list with n. Returns
1055 True if it succeeded (which may include some additional modifications
1056 to shrink_target).
1057
1058 In current usage it is expected that each of the nodes currently have
1059 the same value and choice_type, although this is not essential. Note that
1060 n must be < the node at min(nodes) or this is not a valid shrink.
1061
1062 This method will attempt to do some small amount of work to delete data
1063 that occurs after the end of the nodes. This is useful for cases where
1064 there is some size dependency on the value of a node.
1065 """
1066 # If the length of the shrink target has changed from under us such that
1067 # the indices are out of bounds, give up on the replacement.
1068 # TODO_BETTER_SHRINK: we probably want to narrow down the root cause here at some point.
1069 if any(node.index >= len(self.nodes) for node in nodes):
1070 return # pragma: no cover
1071
1072 initial_attempt = replace_all(
1073 self.nodes,
1074 [(node.index, node.index + 1, [node.copy(with_value=n)]) for node in nodes],
1075 )
1076
1077 attempt = self.cached_test_function(initial_attempt)[1]
1078
1079 if attempt is None:
1080 return False
1081
1082 if attempt is self.shrink_target:
1083 # if the initial shrink was a success, try lowering offsets.
1084 self.lower_common_node_offset()
1085 return True
1086
1087 # If this produced something completely invalid we ditch it
1088 # here rather than trying to persevere.
1089 if attempt.status is Status.OVERRUN:
1090 return False
1091
1092 if attempt.status is Status.INVALID:
1093 return False
1094
1095 if attempt.misaligned_at is not None:
1096 # we're invalid due to a misalignment in the tree. We'll try to fix
1097 # a very specific type of misalignment here: where we have a node of
1098 # {"size": n} and tried to draw the same node, but with {"size": m < n}.
1099 # This can occur with eg
1100 #
1101 # n = data.draw_integer()
1102 # s = data.draw_string(min_size=n)
1103 #
1104 # where we try lowering n, resulting in the test_function drawing a lower
1105 # min_size than our attempt had for the draw_string node.
1106 #
1107 # We'll now try realigning this tree by:
1108 # * replacing the constraints in our attempt with what test_function tried
1109 # to draw in practice
1110 # * truncating the value of that node to match min_size
1111 #
1112 # This helps in the specific case of drawing a value and then drawing
1113 # a collection of that size...and not much else. In practice this
1114 # helps because this antipattern is fairly common.
1115
1116 # TODO we'll probably want to apply the same trick as in the valid
1117 # case of this function of preserving from the right instead of
1118 # preserving from the left. see test_can_shrink_variable_string_draws.
1119
1120 (index, attempt_choice_type, attempt_constraints, _attempt_forced) = (
1121 attempt.misaligned_at
1122 )
1123 node = self.nodes[index]
1124 if node.type != attempt_choice_type:
1125 return False # pragma: no cover
1126 if node.was_forced:
1127 return False # pragma: no cover
1128
1129 if node.type in {"string", "bytes"}:
1130 # if the size *increased*, we would have to guess what to pad with
1131 # in order to try fixing up this attempt. Just give up.
1132 if node.constraints["min_size"] <= attempt_constraints["min_size"]:
1133 # attempts which increase min_size tend to overrun rather than
1134 # be misaligned, making a covering case difficult.
1135 return False # pragma: no cover
1136 # the size decreased in our attempt. Try again, but truncate the value
1137 # to that size by removing any elements past min_size.
1138 return self.consider_new_nodes(
1139 initial_attempt[: node.index]
1140 + [
1141 initial_attempt[node.index].copy(
1142 with_constraints=attempt_constraints,
1143 with_value=initial_attempt[node.index].value[
1144 : attempt_constraints["min_size"]
1145 ],
1146 )
1147 ]
1148 + initial_attempt[node.index :]
1149 )
1150
1151 lost_nodes = len(self.nodes) - len(attempt.nodes)
1152 if lost_nodes <= 0:
1153 return False
1154
1155 start = nodes[0].index
1156 end = nodes[-1].index + 1
1157 # We now look for contiguous regions to delete that might help fix up
1158 # this failed shrink. We only look for contiguous regions of the right
1159 # lengths because doing anything more than that starts to get very
1160 # expensive. See minimize_individual_choices for where we
1161 # try to be more aggressive.
1162 regions_to_delete = {(end, end + lost_nodes)}
1163
1164 for ex in self.spans:
1165 if ex.start > start:
1166 continue
1167 if ex.end <= end:
1168 continue
1169
1170 if ex.index >= len(attempt.spans):
1171 continue # pragma: no cover
1172
1173 replacement = attempt.spans[ex.index]
1174 in_original = [c for c in ex.children if c.start >= end]
1175 in_replaced = [c for c in replacement.children if c.start >= end]
1176
1177 if len(in_replaced) >= len(in_original) or not in_replaced:
1178 continue
1179
1180 # We've found a span where some of the children went missing
1181 # as a result of this change, and just replacing it with the data
1182 # it would have had and removing the spillover didn't work. This
1183 # means that some of its children towards the right must be
1184 # important, so we try to arrange it so that it retains its
1185 # rightmost children instead of its leftmost.
1186 regions_to_delete.add(
1187 (in_original[0].start, in_original[-len(in_replaced)].start)
1188 )
1189
1190 for u, v in sorted(regions_to_delete, key=lambda x: x[1] - x[0], reverse=True):
1191 try_with_deleted = initial_attempt[:u] + initial_attempt[v:]
1192 if self.consider_new_nodes(try_with_deleted):
1193 return True
1194
1195 return False
1196
1197 def remove_discarded(self):
1198 """Try removing all bytes marked as discarded.
1199
1200 This is primarily to deal with data that has been ignored while
1201 doing rejection sampling - e.g. as a result of an integer range, or a
1202 filtered strategy.
1203
1204 Such data will also be handled by the adaptive_example_deletion pass,
1205 but that pass is necessarily more conservative and will try deleting
1206 each interval individually. The common case is that all data drawn and
1207 rejected can just be thrown away immediately in one block, so this pass
1208 will be much faster than trying each one individually when it works.
1209
1210 returns False if there is discarded data and removing it does not work,
1211 otherwise returns True.
1212 """
1213 while self.shrink_target.has_discards:
1214 discarded = []
1215
1216 for ex in self.shrink_target.spans:
1217 if (
1218 ex.choice_count > 0
1219 and ex.discarded
1220 and (not discarded or ex.start >= discarded[-1][-1])
1221 ):
1222 discarded.append((ex.start, ex.end))
1223
1224 # This can happen if we have discards but they are all of
1225 # zero length. This shouldn't happen very often so it's
1226 # faster to check for it here than at the point of example
1227 # generation.
1228 if not discarded:
1229 break
1230
1231 attempt = list(self.nodes)
1232 for u, v in reversed(discarded):
1233 del attempt[u:v]
1234
1235 if not self.consider_new_nodes(tuple(attempt)):
1236 return False
1237 return True
1238
1239 @derived_value # type: ignore
1240 def duplicated_nodes(self):
1241 """Returns a list of nodes grouped (choice_type, value)."""
1242 duplicates = defaultdict(list)
1243 for node in self.nodes:
1244 duplicates[(node.type, choice_key(node.value))].append(node)
1245 return list(duplicates.values())
1246
1247 def node_program(self, program: str) -> ShrinkPass:
1248 return ShrinkPass(
1249 lambda chooser: self._node_program(chooser, program),
1250 name=f"node_program_{program}",
1251 )
1252
1253 def _node_program(self, chooser, program):
1254 n = len(program)
1255 # Adaptively attempt to run the node program at the current
1256 # index. If this successfully applies the node program ``k`` times
1257 # then this runs in ``O(log(k))`` test function calls.
1258 i = chooser.choose(range(len(self.nodes) - n + 1))
1259
1260 # First, run the node program at the chosen index. If this fails,
1261 # don't do any extra work, so that failure is as cheap as possible.
1262 if not self.run_node_program(i, program, original=self.shrink_target):
1263 return
1264
1265 # Because we run in a random order we will often find ourselves in the middle
1266 # of a region where we could run the node program. We thus start by moving
1267 # left to the beginning of that region if possible in order to to start from
1268 # the beginning of that region.
1269 def offset_left(k):
1270 return i - k * n
1271
1272 i = offset_left(
1273 find_integer(
1274 lambda k: self.run_node_program(
1275 offset_left(k), program, original=self.shrink_target
1276 )
1277 )
1278 )
1279
1280 original = self.shrink_target
1281 # Now try to run the node program multiple times here.
1282 find_integer(
1283 lambda k: self.run_node_program(i, program, original=original, repeats=k)
1284 )
1285
1286 def minimize_duplicated_choices(self, chooser):
1287 """Find choices that have been duplicated in multiple places and attempt
1288 to minimize all of the duplicates simultaneously.
1289
1290 This lets us handle cases where two values can't be shrunk
1291 independently of each other but can easily be shrunk together.
1292 For example if we had something like:
1293
1294 ls = data.draw(lists(integers()))
1295 y = data.draw(integers())
1296 assert y not in ls
1297
1298 Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were
1299 to replace both 3s with 0, this would be a valid shrink, but if we were
1300 to replace either 3 with 0 on its own the test would start passing.
1301
1302 It is also useful for when that duplication is accidental and the value
1303 of the choices don't matter very much because it allows us to replace
1304 more values at once.
1305 """
1306 nodes = chooser.choose(self.duplicated_nodes)
1307 # we can't lower any nodes which are trivial. try proceeding with the
1308 # remaining nodes.
1309 nodes = [node for node in nodes if not node.trivial]
1310 if len(nodes) <= 1:
1311 return
1312
1313 self.minimize_nodes(nodes)
1314
1315 def redistribute_numeric_pairs(self, chooser):
1316 """If there is a sum of generated numbers that we need their sum
1317 to exceed some bound, lowering one of them requires raising the
1318 other. This pass enables that."""
1319
1320 # look for a pair of nodes (node1, node2) which are both numeric
1321 # and aren't separated by too many other nodes. We'll decrease node1 and
1322 # increase node2 (note that the other way around doesn't make sense as
1323 # it's strictly worse in the ordering).
1324 def can_choose_node(node):
1325 # don't choose nan, inf, or floats above the threshold where f + 1 > f
1326 # (which is not necessarily true for floats above MAX_PRECISE_INTEGER).
1327 # The motivation for the last condition is to avoid trying weird
1328 # non-shrinks where we raise one node and think we lowered another
1329 # (but didn't).
1330 return node.type in {"integer", "float"} and not (
1331 node.type == "float"
1332 and (math.isnan(node.value) or abs(node.value) >= MAX_PRECISE_INTEGER)
1333 )
1334
1335 node1 = chooser.choose(
1336 self.nodes,
1337 lambda node: can_choose_node(node) and not node.trivial,
1338 )
1339 node2 = chooser.choose(
1340 self.nodes,
1341 lambda node: can_choose_node(node)
1342 # Note that it's fine for node2 to be trivial, because we're going to
1343 # explicitly make it *not* trivial by adding to its value.
1344 and not node.was_forced
1345 # to avoid quadratic behavior, scan ahead only a small amount for
1346 # the related node.
1347 and node1.index < node.index <= node1.index + 4,
1348 )
1349
1350 m: Union[int, float] = node1.value
1351 n: Union[int, float] = node2.value
1352
1353 def boost(k: int) -> bool:
1354 if k > m:
1355 return False
1356
1357 try:
1358 v1 = m - k
1359 v2 = n + k
1360 except OverflowError: # pragma: no cover
1361 # if n or m is a float and k is over sys.float_info.max, coercing
1362 # k to a float will overflow.
1363 return False
1364
1365 # if we've increased node2 to the point that we're past max precision,
1366 # give up - things have become too unstable.
1367 if node1.type == "float" and v2 >= MAX_PRECISE_INTEGER:
1368 return False
1369
1370 return self.consider_new_nodes(
1371 self.nodes[: node1.index]
1372 + (node1.copy(with_value=v1),)
1373 + self.nodes[node1.index + 1 : node2.index]
1374 + (node2.copy(with_value=v2),)
1375 + self.nodes[node2.index + 1 :]
1376 )
1377
1378 find_integer(boost)
1379
1380 def lower_integers_together(self, chooser):
1381 node1 = chooser.choose(
1382 self.nodes, lambda n: n.type == "integer" and not n.trivial
1383 )
1384 # Search up to 3 nodes ahead, to avoid quadratic time.
1385 node2 = self.nodes[
1386 chooser.choose(
1387 range(node1.index + 1, min(len(self.nodes), node1.index + 3 + 1)),
1388 lambda i: self.nodes[i].type == "integer"
1389 and not self.nodes[i].was_forced,
1390 )
1391 ]
1392
1393 # one might expect us to require node2 to be nontrivial, and to minimize
1394 # the node which is closer to its shrink_towards, rather than node1
1395 # unconditionally. In reality, it's acceptable for us to transition node2
1396 # from trivial to nontrivial, because the shrink ordering is dominated by
1397 # the complexity of the earlier node1. What matters is minimizing node1.
1398 shrink_towards = node1.constraints["shrink_towards"]
1399
1400 def consider(n):
1401 return self.consider_new_nodes(
1402 self.nodes[: node1.index]
1403 + (node1.copy(with_value=node1.value - n),)
1404 + self.nodes[node1.index + 1 : node2.index]
1405 + (node2.copy(with_value=node2.value - n),)
1406 + self.nodes[node2.index + 1 :]
1407 )
1408
1409 find_integer(lambda n: consider(shrink_towards - n))
1410 find_integer(lambda n: consider(n - shrink_towards))
1411
1412 def lower_duplicated_characters(self, chooser):
1413 """
1414 Select two string choices no more than 4 choices apart and simultaneously
1415 lower characters which appear in both strings. This helps cases where the
1416 same character must appear in two strings, but the actual value of the
1417 character is not relevant.
1418
1419 This shrinking pass currently only tries lowering *all* instances of the
1420 duplicated character in both strings. So for instance, given two choices:
1421
1422 "bbac"
1423 "abbb"
1424
1425 we would try lowering all five of the b characters simultaneously. This
1426 may fail to shrink some cases where only certain character indices are
1427 correlated, for instance if only the b at index 1 could be lowered
1428 simultaneously and the rest did in fact actually have to be a `b`.
1429
1430 It would be nice to try shrinking that case as well, but we would need good
1431 safeguards because it could get very expensive to try all combinations.
1432 I expect lowering all duplicates to handle most cases in the meantime.
1433 """
1434 node1 = chooser.choose(
1435 self.nodes, lambda n: n.type == "string" and not n.trivial
1436 )
1437
1438 # limit search to up to 4 choices ahead, to avoid quadratic behavior
1439 node2 = self.nodes[
1440 chooser.choose(
1441 range(node1.index + 1, min(len(self.nodes), node1.index + 1 + 4)),
1442 lambda i: self.nodes[i].type == "string" and not self.nodes[i].trivial
1443 # select nodes which have at least one of the same character present
1444 and set(node1.value) & set(self.nodes[i].value),
1445 )
1446 ]
1447
1448 duplicated_characters = set(node1.value) & set(node2.value)
1449 # deterministic ordering
1450 char = chooser.choose(sorted(duplicated_characters))
1451 intervals = node1.constraints["intervals"]
1452
1453 def copy_node(node, n):
1454 # replace all duplicate characters in each string. This might miss
1455 # some shrinks compared to only replacing some, but trying all possible
1456 # combinations of indices could get expensive if done without some
1457 # thought.
1458 return node.copy(
1459 with_value=node.value.replace(char, intervals.char_in_shrink_order(n))
1460 )
1461
1462 Integer.shrink(
1463 intervals.index_from_char_in_shrink_order(char),
1464 lambda n: self.consider_new_nodes(
1465 self.nodes[: node1.index]
1466 + (copy_node(node1, n),)
1467 + self.nodes[node1.index + 1 : node2.index]
1468 + (copy_node(node2, n),)
1469 + self.nodes[node2.index + 1 :]
1470 ),
1471 )
1472
1473 def minimize_nodes(self, nodes):
1474 choice_type = nodes[0].type
1475 value = nodes[0].value
1476 # unlike choice_type and value, constraints are *not* guaranteed to be equal among all
1477 # passed nodes. We arbitrarily use the constraints of the first node. I think
1478 # this is unsound (= leads to us trying shrinks that could not have been
1479 # generated), but those get discarded at test-time, and this enables useful
1480 # slips where constraints are not equal but are close enough that doing the
1481 # same operation on both basically just works.
1482 constraints = nodes[0].constraints
1483 assert all(
1484 node.type == choice_type and choice_equal(node.value, value)
1485 for node in nodes
1486 )
1487
1488 if choice_type == "integer":
1489 shrink_towards = constraints["shrink_towards"]
1490 # try shrinking from both sides towards shrink_towards.
1491 # we're starting from n = abs(shrink_towards - value). Because the
1492 # shrinker will not check its starting value, we need to try
1493 # shrinking to n first.
1494 self.try_shrinking_nodes(nodes, abs(shrink_towards - value))
1495 Integer.shrink(
1496 abs(shrink_towards - value),
1497 lambda n: self.try_shrinking_nodes(nodes, shrink_towards + n),
1498 )
1499 Integer.shrink(
1500 abs(shrink_towards - value),
1501 lambda n: self.try_shrinking_nodes(nodes, shrink_towards - n),
1502 )
1503 elif choice_type == "float":
1504 self.try_shrinking_nodes(nodes, abs(value))
1505 Float.shrink(
1506 abs(value),
1507 lambda val: self.try_shrinking_nodes(nodes, val),
1508 )
1509 Float.shrink(
1510 abs(value),
1511 lambda val: self.try_shrinking_nodes(nodes, -val),
1512 )
1513 elif choice_type == "boolean":
1514 # must be True, otherwise would be trivial and not selected.
1515 assert value is True
1516 # only one thing to try: false!
1517 self.try_shrinking_nodes(nodes, False)
1518 elif choice_type == "bytes":
1519 Bytes.shrink(
1520 value,
1521 lambda val: self.try_shrinking_nodes(nodes, val),
1522 min_size=constraints["min_size"],
1523 )
1524 elif choice_type == "string":
1525 String.shrink(
1526 value,
1527 lambda val: self.try_shrinking_nodes(nodes, val),
1528 intervals=constraints["intervals"],
1529 min_size=constraints["min_size"],
1530 )
1531 else:
1532 raise NotImplementedError
1533
1534 def try_trivial_spans(self, chooser):
1535 i = chooser.choose(range(len(self.spans)))
1536
1537 prev = self.shrink_target
1538 nodes = self.shrink_target.nodes
1539 ex = self.spans[i]
1540 prefix = nodes[: ex.start]
1541 replacement = tuple(
1542 [
1543 (
1544 node
1545 if node.was_forced
1546 else node.copy(
1547 with_value=choice_from_index(0, node.type, node.constraints)
1548 )
1549 )
1550 for node in nodes[ex.start : ex.end]
1551 ]
1552 )
1553 suffix = nodes[ex.end :]
1554 attempt = self.cached_test_function(prefix + replacement + suffix)[1]
1555
1556 if self.shrink_target is not prev:
1557 return
1558
1559 if isinstance(attempt, ConjectureResult):
1560 new_ex = attempt.spans[i]
1561 new_replacement = attempt.nodes[new_ex.start : new_ex.end]
1562 self.consider_new_nodes(prefix + new_replacement + suffix)
1563
1564 def minimize_individual_choices(self, chooser):
1565 """Attempt to minimize each choice in sequence.
1566
1567 This is the pass that ensures that e.g. each integer we draw is a
1568 minimum value. So it's the part that guarantees that if we e.g. do
1569
1570 x = data.draw(integers())
1571 assert x < 10
1572
1573 then in our shrunk example, x = 10 rather than say 97.
1574
1575 If we are unsuccessful at minimizing a choice of interest we then
1576 check if that's because it's changing the size of the test case and,
1577 if so, we also make an attempt to delete parts of the test case to
1578 see if that fixes it.
1579
1580 We handle most of the common cases in try_shrinking_nodes which is
1581 pretty good at clearing out large contiguous blocks of dead space,
1582 but it fails when there is data that has to stay in particular places
1583 in the list.
1584 """
1585 node = chooser.choose(self.nodes, lambda node: not node.trivial)
1586 initial_target = self.shrink_target
1587
1588 self.minimize_nodes([node])
1589 if self.shrink_target is not initial_target:
1590 # the shrink target changed, so our shrink worked. Defer doing
1591 # anything more intelligent until this shrink fails.
1592 return
1593
1594 # the shrink failed. One particularly common case where minimizing a
1595 # node can fail is the antipattern of drawing a size and then drawing a
1596 # collection of that size, or more generally when there is a size
1597 # dependency on some single node. We'll explicitly try and fix up this
1598 # common case here: if decreasing an integer node by one would reduce
1599 # the size of the generated input, we'll try deleting things after that
1600 # node and see if the resulting attempt works.
1601
1602 if node.type != "integer":
1603 # Only try this fixup logic on integer draws. Almost all size
1604 # dependencies are on integer draws, and if it's not, it's doing
1605 # something convoluted enough that it is unlikely to shrink well anyway.
1606 # TODO: extent to floats? we probably currently fail on the following,
1607 # albeit convoluted example:
1608 # n = int(data.draw(st.floats()))
1609 # s = data.draw(st.lists(st.integers(), min_size=n, max_size=n))
1610 return
1611
1612 lowered = (
1613 self.nodes[: node.index]
1614 + (node.copy(with_value=node.value - 1),)
1615 + self.nodes[node.index + 1 :]
1616 )
1617 attempt = self.cached_test_function(lowered)[1]
1618 if (
1619 attempt is None
1620 or attempt.status < Status.VALID
1621 or len(attempt.nodes) == len(self.nodes)
1622 or len(attempt.nodes) == node.index + 1
1623 ):
1624 # no point in trying our size-dependency-logic if our attempt at
1625 # lowering the node resulted in:
1626 # * an invalid conjecture data
1627 # * the same number of nodes as before
1628 # * no nodes beyond the lowered node (nothing to try to delete afterwards)
1629 return
1630
1631 # If it were then the original shrink should have worked and we could
1632 # never have got here.
1633 assert attempt is not self.shrink_target
1634
1635 @self.cached(node.index)
1636 def first_span_after_node():
1637 lo = 0
1638 hi = len(self.spans)
1639 while lo + 1 < hi:
1640 mid = (lo + hi) // 2
1641 ex = self.spans[mid]
1642 if ex.start >= node.index:
1643 hi = mid
1644 else:
1645 lo = mid
1646 return hi
1647
1648 # we try deleting both entire spans, and single nodes.
1649 # If we wanted to get more aggressive, we could try deleting n
1650 # consecutive nodes (that don't cross a span boundary) for say
1651 # n <= 2 or n <= 3.
1652 if chooser.choose([True, False]):
1653 ex = self.spans[
1654 chooser.choose(
1655 range(first_span_after_node, len(self.spans)),
1656 lambda i: self.spans[i].choice_count > 0,
1657 )
1658 ]
1659 self.consider_new_nodes(lowered[: ex.start] + lowered[ex.end :])
1660 else:
1661 node = self.nodes[chooser.choose(range(node.index + 1, len(self.nodes)))]
1662 self.consider_new_nodes(lowered[: node.index] + lowered[node.index + 1 :])
1663
1664 def reorder_spans(self, chooser):
1665 """This pass allows us to reorder the children of each span.
1666
1667 For example, consider the following:
1668
1669 .. code-block:: python
1670
1671 import hypothesis.strategies as st
1672 from hypothesis import given
1673
1674
1675 @given(st.text(), st.text())
1676 def test_not_equal(x, y):
1677 assert x != y
1678
1679 Without the ability to reorder x and y this could fail either with
1680 ``x=""``, ``y="0"``, or the other way around. With reordering it will
1681 reliably fail with ``x=""``, ``y="0"``.
1682 """
1683 ex = chooser.choose(self.spans)
1684 label = chooser.choose(ex.children).label
1685
1686 spans = [c for c in ex.children if c.label == label]
1687 if len(spans) <= 1:
1688 return
1689 st = self.shrink_target
1690 endpoints = [(ex.start, ex.end) for ex in spans]
1691
1692 Ordering.shrink(
1693 range(len(spans)),
1694 lambda indices: self.consider_new_nodes(
1695 replace_all(
1696 st.nodes,
1697 [
1698 (
1699 u,
1700 v,
1701 st.nodes[spans[i].start : spans[i].end],
1702 )
1703 for (u, v), i in zip(endpoints, indices)
1704 ],
1705 )
1706 ),
1707 key=lambda i: sort_key(st.nodes[spans[i].start : spans[i].end]),
1708 )
1709
1710 def run_node_program(self, i, program, original, repeats=1):
1711 """Node programs are a mini-DSL for node rewriting, defined as a sequence
1712 of commands that can be run at some index into the nodes
1713
1714 Commands are:
1715
1716 * "X", delete this node
1717
1718 This method runs the node program in ``program`` at node index
1719 ``i`` on the ConjectureData ``original``. If ``repeats > 1`` then it
1720 will attempt to approximate the results of running it that many times.
1721
1722 Returns True if this successfully changes the underlying shrink target,
1723 else False.
1724 """
1725 if i + len(program) > len(original.nodes) or i < 0:
1726 return False
1727 attempt = list(original.nodes)
1728 for _ in range(repeats):
1729 for k, command in reversed(list(enumerate(program))):
1730 j = i + k
1731 if j >= len(attempt):
1732 return False
1733
1734 if command == "X":
1735 del attempt[j]
1736 else:
1737 raise NotImplementedError(f"Unrecognised command {command!r}")
1738
1739 return self.consider_new_nodes(attempt)